import { Inject, Injectable } from "@angular/core";
import {
  ConnectableObservable,
  from,
  from as fromPromise,
  merge,
  Observable,
  Observer,
  of,
  Subject,
  throwError,
} from "rxjs";
import {
  catchError,
  filter,
  first,
  map,
  mergeMap,
  publishReplay,
  refCount,
  share,
  shareReplay,
  switchMap,
  takeUntil,
  tap,
} from "rxjs/operators";
import { ApplicationCommand } from "./application-interfaces";
import { isNil, isObjectLike, matches } from "lodash";
import { WampClientService } from "./wamp.client.service";
import { StateLineConfigService } from "./state-line-config";
import { distinctUntilChanged } from 'rxjs';
import { COMMAND_REQUEST_STATE } from "./application-commands";
import { TOPIC_ENGINE_COMMANDS } from "./application-topics";
import { MonkeyWayService } from "../services/monkey-way.service";
import { AnalyticsService } from "../services/analytics.service";


export declare type ProductId = string | number;

export interface StateLineCredentials {
  user: string;
  password: string;
}

export interface StateLineAuthenticator {
  obtainCredentials: () => Promise<StateLineCredentials>;
}

export class StateLineConnectionOptions {
  url: string | undefined;
  realm: string | undefined;
  // authenticator: StateLineAuthenticator | undefined;

  /**
   * The topic on which application commands are published and subscribed to.
   */
  applicationCommandsTopic = "com.mackevision.engine.commands";
  /**
   * Maximum number of reconnection attempts. Unlimited if set to -1 (default 3)
   */
  maxRetries?: number;
  /**
   * Initial delay for reconnection attempt in seconds (default: 1.5)
   */
  initialRetryDelay?: number;
  /**
   * Maximum delay for reconnection attempts in seconds (default: 300)
   */
  maxRetryDelay?: number;
  /**
   * The growth factor applied to the retry delay between reconnection attempts (default: 1.5)
   */
  retryDelayGrowth?: number;
  /**
   * The standard deviation of a Gaussian to jitter the delay on each retry cycle as a fraction of the mean (default: 0.1)
   */
  retryDelayJitter?: number;
  /**
   * Array of serializers to be used for the connection.
   * Defaults to MsgpackSerializer and JSONSerializer
   */
  serializers?: any[];
  authid?: string;
}

export interface ConnectionState {
  state: "CONNECTED" | "DISCONNECTED";
  details?: any;
}

export interface ApplicationCommandFilter {
  category?: string;
  identifier?: string;
  stringValue?: string | any;
  floatValue?: number;
  boolValue?: boolean;
  intValue?: number;
}

/**
 * @dynamic
 */
@Injectable({
  providedIn: "root",
})
export class StateLineClientService {
  private sessionCreated: Observable<WampClientService> = of();
  // private sessionId: string | null;

  private readonly modifyWampProviderSubject = new Subject<WampClientService>();

  private readonly connectionStateSubject = new Subject<ConnectionState>();
  private readonly connectionState$ =
    this.connectionStateSubject.asObservable();

  private optionsPromise: any;

  private readonly errorLogger = (reason: any, error?: any) => {
    console.error(reason, error);
    this.analytics.error(reason.toString());
  };

  public destroy$ = new Subject<void>();
  public alreadyConnected = false;

  constructor(
    private wampProvider: WampClientService,
    private analytics: AnalyticsService
    ) {
    wampProvider.wampConfig$.pipe(distinctUntilChanged(),filter(f => !!f)).subscribe({next: wampConfig => {
      this.optionsPromise = wampConfig;
    }});
  }

  init() {
    if (!this.alreadyConnected) {
      this.sessionCreated = merge<WampClientService | any>(
        Observable.create((observer: Observer<WampClientService>) => {
          this.wampProvider.openNewConnection(
            () => {
              this.alreadyConnected = true;
              this.connectionStateSubject.next({
                state: "CONNECTED",
              });
              observer.next(this.wampProvider);
            },
            (reason: any) => {
              this.connectionStateSubject.next({
                state: "DISCONNECTED",
                details: {
                  ...reason,
                },
              });
              if (!reason.will_retry) {
                observer.error(reason);
              }
              return false;
            }
          );
          return () => {
            if (this.wampProvider && this.wampProvider.isConnectionOpen()) {
              try {
                this.wampProvider.closeCurrentConnection();
              } catch (error) {
                this.errorLogger("Failed closing connection", error);
              }
            }
          };
        }),
        this.modifyWampProviderSubject
      ).pipe(
        tap(() => this.subscribeToCommand("com.mackevision.engine.commands").subscribe()),
        catchError((error: any) => {
          this.errorLogger("Connection to server failed", error);
          return throwError(error);
        }),
        takeUntil(this.destroy$),
        shareReplay(1),
      );
    }
  }

  getSessionId(): Observable<number> {
    return this.sessionCreated.pipe(
      filter((wampProvider: any) => {
        return !!wampProvider.sessionId;
      }),
      map((wampProvider) => {
        if (!wampProvider.sessionId) {
          throw new Error("Missing session id");
        }
        // this.sessionId = wampProvider.sessionId;
        return wampProvider.sessionId;
      })
    );
  }

  getConnectionState(): Observable<ConnectionState> {
    return this.connectionState$;
  }

  closeConnection(){
    if (this.wampProvider && this.wampProvider.isConnectionOpen()) {
      try {
        this.wampProvider.closeCurrentConnection();
      } catch (error) {
        this.errorLogger("Failed closing connection", error);
      }
    }
  }

  /**
   * Subscribe to a topic and interpret the payload as JSON.
   * Errors in case the payload cannot be parsed to JSON.
   * @param topicName - The topic to subscribe to.
   * @return Observable<T>
   */
  subscribeJson<T>(topicName: string): Observable<T> {
    return this.subscribeInternal<T>(topicName, true);
  }

  /**
   * Subscribe to a topic and pass the payload through as-is.
   * @param topicName - The topic to subscribe to.
   * @return Observable<T>
   */
  subscribe<T>(topicName: string): Observable<T> {
    return this.subscribeInternal<T>(topicName, false);
  }

  /**
   * Subscribe to application commands of the given type, optionally filtered by a given commandFilter.
   * @param type
   * @param commandFilter
   */
  subscribeToCommand<T extends ApplicationCommand>(
    type: string,
    commandFilter?: ApplicationCommandFilter
  ): Observable<T> {
    return of(this.optionsPromise).pipe(
      switchMap((options) => {
        return this.subscribe(TOPIC_ENGINE_COMMANDS).pipe(
          map((command: any) => {
            try {
              return JSON.parse(command);
            } catch (error) {
              console.error(`Failed parsing command as JSON`, command);
              throw new Error(
                `Failed parsing command as JSON. Error: [${error}]. Command: [${command}]`
              );
            }
          }),
          filter((command: T) => {
            let commandMatches = command.type === type;
            if (commandMatches && commandFilter) {
              commandMatches = matches(commandFilter)(command);
            }
            return commandMatches;
          })
        );
      })
    );
  }

  publish(topic: string, data: any): Observable<void> {
    const publishObservable = <ConnectableObservable<void>>(
      this.sessionCreated.pipe(
        tap((wampProvider) => {
          return fromPromise(
            wampProvider.publishEvent(`${topic}.` + wampProvider.sessionId, [
              JSON.stringify(data),
            ])
          );
        }),
        map(() => void 0),
        first(),
        catchError((reason) => {
          this.errorLogger(`Failed publishing to topic ${topic}`, reason);
          return throwError(reason);
        }),
        publishReplay(1)
      )
    );
    publishObservable.connect();
    return publishObservable;
  }

  /**
   * Publish the given command to the applicationCommandTopic provided in the connection-options.
   * @param command
   */
  publishCommand(command: ApplicationCommand) {
    const publishCommand$ = <ConnectableObservable<void>>fromPromise(
      Promise.resolve()
    ).pipe(
      switchMap(() => {
        return this.publish("com.mackevision.ui.commands", command);
      }),
      publishReplay(1)
    );
    publishCommand$.connect();
    return publishCommand$;
  }

  callRaw(
    endpoint: string,
    payload?: any,
    appendSessionId?: boolean
  ): Observable<any> {
    return this.callInternal<any>(
      endpoint,
      payload,
      appendSessionId ? true : false,
      false
    );
  }

  callJson<T>(
    endpoint: string,
    payload?: any,
    appendSessionId?: boolean
  ): Observable<T> {
    return this.callInternal<T>(
      endpoint,
      payload,
      appendSessionId ? true : false,
      true
    );
  }

  /**
   * Calls the given endpoint and parses the result as JSON.
   * @param endpoint
   * @param payload
   * @param appendSessionId
   * @return Observable<T>
   * @deprecated Use either callJson or callRaw
   */
  call<T>(
    endpoint: string,
    payload?: any,
    appendSessionId?: boolean
  ): Observable<T> {
    return this.callInternal<T>(
      endpoint,
      payload,
      appendSessionId ? true : false,
      true
    );
  }

  private callInternal<T>(
    endpoint: string,
    payload: any,
    appendSessionId: boolean,
    parseJson: boolean
  ): Observable<T> {
    const source$ = this.sessionCreated.pipe(
      first(),
      switchMap((wampProvider: WampClientService) => {
        let adjustedEndpoint = endpoint;
        if (!!adjustedEndpoint) {
          if (appendSessionId) {
            adjustedEndpoint = adjustedEndpoint + "." + wampProvider.sessionId;
          }
          let adjustedPayload = payload;
          if (!!adjustedPayload) {
            adjustedPayload = JSON.stringify(adjustedPayload);
          }

          return from(
            wampProvider.callRemoteProcedure(
              adjustedEndpoint,
            (!!adjustedPayload) ? [adjustedPayload] : undefined
            )
          ).pipe(
            map((response: any) => {
              if (parseJson) {
                return JSON.parse(response) as T;
              }
              return response as T;
            }),
            catchError((reason) => {
              this.errorLogger(`Failed calling endpoint ${endpoint}`, reason);
              return throwError(reason);
            })
          );
        } else {
          return throwError("undefined endpoint");
        }
      }),
    );

    // Use the publishReplay() operator only if you need caching of the last emitted value
    return source$.pipe(shareReplay(1));
  }


  // private callInternal<T> (endpoint: string, payload: any, appendSessionId: boolean, parseJson: boolean): Observable<T> {
  //   const callObservable = <ConnectableObservable<T>> this.sessionCreated
  //     .pipe(
  //       first(),
  //       mergeMap<WampProvider,WampConnectionOptions>((wampProvider: WampProvider) => {
  //         let adjustedEndpoint = endpoint;
  //         if (!!adjustedEndpoint){
  //           if (appendSessionId) {
  //             adjustedEndpoint = adjustedEndpoint + '.' + wampProvider.sessionId;
  //           }
  //           let adjustedPayload = payload;
  //           if (isObjectLike(adjustedPayload)) {
  //             adjustedPayload = JSON.stringify(adjustedPayload);
  //           }

  //           return fromPromise(wampProvider.callRemoteProcedure(
  //             adjustedEndpoint,
  //             !isNil(adjustedPayload) ? [adjustedPayload] : undefined)
  //           );
  //         } else{
  //           throwError("undefined");
  //         }
  //       }),
  //       map((response: string) => {
  //         if (parseJson) {
  //           return JSON.parse(response);
  //         }
  //         return response;
  //       }),
  //       catchError((reason) => {
  //         this.errorLogger(`Failed calling endpoint ${endpoint}`, reason);
  //         return throwError(reason);
  //       }),
  //       publishReplay(1)
  //     );
  //   // make it hot
  //   callObservable.connect();
  //   return callObservable;
  // }

  private subscribeInternal<T>(
    topicName: string,
    parseJson: boolean
  ): Observable<any> {
    return this.sessionCreated.pipe(
      switchMap((wampProvider) => {
        return new Observable((observer: Observer<T>) => {
          const onMessage = (payload: any) => {
            let adjustedPayload = payload;
            if (payload && parseJson) {
              try {
                adjustedPayload = JSON.parse(payload);
              } catch (error) {
                this.errorLogger(
                  `Failed parsing payload as JSON. Error: [${error}]`,
                  payload
                );
                observer.error(error);
                return;
              }
            }
            observer.next(adjustedPayload);
          };

          let topicSubscription: any;

          wampProvider
            .subscribeToTopic(
              `${topicName}.${wampProvider.sessionId}`,
              onMessage,
              undefined
            )
            .then(
              (subscription: any) => {
                topicSubscription = subscription;
              },
              (reason: any) => {
                observer.error(reason);
              }
            );

          return () => {
            // unsubscribe
            if (topicSubscription) {
              try {
                wampProvider.unsubscribeToTopic(topicSubscription)
                .then(() => void 0)
                .catch((error) => {
                  this.errorLogger(
                    `Failed unsubscribing from topic [${topicName}, promise rejection]`,
                    error
                  );
                });
              } catch (error) {
                this.errorLogger(
                  `Failed unsubscribing from topic [${topicName}]`,
                  error
                );
              }
            }
            observer.complete();
          };
        });
      })
    );
  }

  private setOverrideSessionId(sessionId: number) {
    this.sessionCreated
      .pipe(
        first(),
        tap((wampProvider) => {
          (<any>wampProvider).overrideSessionId = sessionId;
          this.modifyWampProviderSubject.next(wampProvider);
        })
      )
      .subscribe();
  }
}
