import { CORE_DEVICE_LOCALE, CORE_DEVICE_TIMEZONE, CORE_WEBSOCKET_BASE_URL } from '../../core.tokens';
import { DestroyRef, Inject, Injectable } from '@angular/core';
import { BehaviorSubject, combineLatest, EMPTY, merge, of, Subject, Subscription, timer } from 'rxjs';
import {
  catchError,
  debounceTime,
  delay,
  distinctUntilChanged,
  filter,
  finalize,
  map,
  share,
  skip,
  switchMap,
  take,
  takeUntil,
  tap,
} from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
import { AuthService } from './auth.service';
import { NetworkService } from './network.service';
import { WatchdogService } from './watchdog.service';
import { LoginMetadata, MessageMap, WebSocketResponse } from '../models/websocket.model';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { ClusterService } from './cluster.service';
import { isNotNull } from '../utils';

@Injectable()
export class WebsocketService {

  private readonly logger = this.watchdog.tag('Websocket', 'magenta');

  private wsSub: Subscription | null = null;
  private timeoutSub: Subscription | null = null;
  private heartbeatSub: Subscription | null = null;

  public readonly status$ = new BehaviorSubject<boolean>(false);
  public readonly connectionCount$ = new BehaviorSubject<number>(0);
  public readonly lastConnectionAt$ = new BehaviorSubject<Date | null>(null);
  public readonly lastDisconnectionAt$ = new BehaviorSubject<Date | null>(null);

  private readonly ws$ = webSocket<WebSocketResponse>({
    url: `${ this.websocketBaseUrl }/ws/main`,
    openObserver: {
      next: () => {
        this.logger.debug('Open');
        this.handshake();
      },
    },
    closingObserver: {
      next: () => this.logger.debug('Closing'),
    },
    closeObserver: {
      next: () => {
        this.logger.debug('Close');
        this.status$.next(false);
        this.lastDisconnectionAt$.next(new Date());
      },
    },
  });

  private readonly messagesSubject = new Subject<WebSocketResponse>();

  public readonly messages$ = this.messagesSubject.asObservable();

  constructor(
    @Inject(CORE_DEVICE_LOCALE) private readonly deviceLocale: string,
    @Inject(CORE_DEVICE_TIMEZONE) private readonly deviceTimezone: string,
    @Inject(CORE_WEBSOCKET_BASE_URL) private readonly websocketBaseUrl: string,
    private readonly destroyRef: DestroyRef,
    private readonly auth: AuthService,
    private readonly cluster: ClusterService,
    private readonly watchdog: WatchdogService,
    private readonly network: NetworkService,
  ) {}

  public initialize(): void {
    this.logger.info('Initialize');

    combineLatest([
      this.cluster.leader$,
      this.auth.authorized$,
      this.network.status$.pipe(
        distinctUntilChanged(),
      ),
    ]).pipe(
      tap(([leader, authorized, status]) => {
        if (!leader || !authorized || !status) {
          this.unsubscribeAll();
        }
        else {
          this.initWebSocket();
        }
      }),
      takeUntilDestroyed(this.destroyRef),
    ).subscribe();
  }

  public send<K extends keyof MessageMap>(type: K, data: MessageMap[K]): void {
    this.logger.debug('Send', type, data);
    this.ws$.next({ type, data });
  }

  public handshake(): void {
    const metadata$ = this.cluster.members$.pipe(
      map((members) => {
        return {
          device: {
            locale: this.deviceLocale,
            timezone: this.deviceTimezone,
            screen: {
              width: window.screen.width,
              height: window.screen.height,
            },
          },
          app: this.cluster.appMetadata,
          sideCarApps: (
            Array.from(members.values()).filter((member) => {
              return member.app.startupId !== this.cluster.appMetadata.startupId;
            }).map((member) => {
              return member.app;
            })
          ),
        } satisfies LoginMetadata;
      }),
      share(),
    );

    combineLatest([
      this.auth.token$.pipe(filter(isNotNull)),
      metadata$,
    ]).pipe(
      take(1),
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(([token, metadata]) => {
      this.send('login', {
        token,
        metadata: metadata,
      });
    });

    metadata$.pipe(
      skip(1),
      debounceTime(300),
      takeUntil(this.status$.pipe(
        filter((status) => !status),
        skip(1),
      )),
      takeUntilDestroyed(this.destroyRef),
    ).subscribe((metadata) => {
      this.send('clusterChanged', metadata);
    });
  }

  private unsubscribeAll(): void {
    this.wsSub?.unsubscribe();
    this.wsSub = null;
    this.timeoutSub?.unsubscribe();
    this.timeoutSub = null;
    this.heartbeatSub?.unsubscribe();
    this.heartbeatSub = null;
  }

  private initWebSocket(): void {
    this.unsubscribeAll();

    const share$ = this.ws$.pipe(
      filter((response) => !!(response?.type)),
      catchError((error) => {
        this.logger.error(error);
        return EMPTY;
      }),
      share(),
    );

    this.timeoutSub = merge(of('init'), share$).pipe(
      switchMap(() => of('timeout').pipe(delay(10000))),
      takeUntilDestroyed(this.destroyRef),
    ).subscribe((msg) => {
      this.logger.error(msg);
      this.initWebSocket();
    });

    this.heartbeatSub = timer(1000, 5000).pipe(
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(() => {
      this.send('heartbeat', { time: Date.now().toString() });
    });

    this.wsSub = share$.pipe(
      tap((response) => {
        this.logger.debug(response.type, response.data);

        this.messagesSubject.next(response);

        switch (response.type) {
          case 'loginSuccessful':
            this.logger.debug('Received: Login successful.');

            this.status$.next(true);
            this.connectionCount$.next(this.connectionCount$.getValue() + 1);
            this.lastConnectionAt$.next(new Date());

            break;

          case 'loginError':
          case 'deleteTable':
            this.logger.debug('Received: ' + response.type === 'loginError' ? 'Login error.' : 'Delete table.');
            this.auth.logout();
            this.ws$.complete();
            break;

          case 'heartbeat':
            this.logger.debug('Received: Heartbeat', response.data?.time);
            break;

          case 'error':
            this.logger.error('Received: Error', response.data);
            break;

          default:
            this.logger.debug('Received: Unknown message', response);
            break;
        }
      }),
      catchError((error) => {
        this.logger.error(error);

        this.initWebSocket();
        return EMPTY;
      }),
      finalize(() => this.logger.info('Complete')),
      takeUntilDestroyed(this.destroyRef)
    ).subscribe();
  }
}
