import {Injectable, OnDestroy} from '@angular/core';
import {BehaviorSubject, first, Observable, switchMap} from 'rxjs';
import SockJS from 'sockjs-client';
import {environment} from '../../../environments/environment';
import {EventBusService} from './event-bus.service';
import {CompatClient, Stomp, StompSocketState} from '@stomp/stompjs';
import {AuthenticationService} from './authentication.service';
import {filter} from "rxjs/operators";

@Injectable({
  providedIn: 'root'
})
export class Gd360StompService implements OnDestroy {
  private readonly _stompClient: CompatClient;
  private readonly _state: BehaviorSubject<StompSocketState>;
  private static readonly RECONNECT_DELAY_MS = 2000;
  private static readonly HEARTBEAT_IN_MS = 10_000;
  private static readonly HEARTBEAT_OUT_MS = 10_000;

  private _everConnected: boolean;
  private static readonly AUTHORIZATION_HEADER = 'X-Authorization';

  constructor(private _eventBusService: EventBusService,
              private _authenticationService: AuthenticationService) {
    this._everConnected = false;
    this._state = new BehaviorSubject<StompSocketState>(StompSocketState.CLOSED);
    this._stompClient = Stomp.over(() => {
      return new SockJS(environment.ws);
    });
    this._stompClient.configure({
      reconnectDelay: Gd360StompService.RECONNECT_DELAY_MS,
      brokerURL: environment.ws,
      heartbeatIncoming: Gd360StompService.HEARTBEAT_IN_MS,
      heartbeatOutgoing: Gd360StompService.HEARTBEAT_OUT_MS,
      debug: () => null,
      splitLargeFrames: true,
      beforeConnect: this._onConnecting.bind(this),
    });
  }

  ngOnDestroy() {
    this.disconnect();
  }

  connect() {
    this._state.pipe(
      filter(value => value === StompSocketState.CLOSED),
      first()
    ).subscribe(_ => this._stompClient.connect(null, null, this._connectCallback.bind(this)))
  }

  private _connectCallback() {
    if (this._everConnected) {
      this._resubscribeActiveTopics();
    }
    this._onConnect();
  }

  disconnect() {
    this._state.pipe(
      filter(state => state === StompSocketState.OPEN),
      first()
    ).subscribe(
      _ => this._stompClient.disconnect(this._onDisconnect.bind(this))
    );
  }

  subscribe(topic: string): Observable<any> {
    return this._state.pipe(
      filter(state => state === StompSocketState.OPEN),
      switchMap(_ => {
        return new Observable(observer => {
          const subscription = this._getNotExpiredToken()
            .catch(_ => this._stompClient.forceDisconnect())
            .then((token: string) => {
              return this._stompClient.subscribe(topic, message => {
                observer.next(message);
              }, {'X-Authorization': token});
            });

          return () => subscription.then(subscription => this._stompClient.unsubscribe(subscription.id));
        });
      })
    );
  }

  private _getNotExpiredToken(): Promise<string> {
    return this._authenticationService.getNotExpiredToken();
  }

  private _resubscribeActiveTopics() {
    this._eventBusService.resubscribeWebsocketTopics();
  }

  private _onConnect() {
    this._eventBusService.wsConnected();
    this._everConnected = true;
    this._state.next(StompSocketState.OPEN);
  }

  private _onDisconnect() {
    this._everConnected = false;
    this._eventBusService.wsDisconnected();
    this._state.next(StompSocketState.CLOSED);
  }

  private _onConnecting() {
    this._eventBusService.wsConnecting();
    this._state.next(StompSocketState.CONNECTING);
    return this._updateAuthorizationHeader();
  }

  private _updateAuthorizationHeader(): Promise<void> {
    return new Promise<void>((resolve, reject) => {
      this._state.pipe(
        filter(state => state != StompSocketState.OPEN)
      ).subscribe(_ => {
        this._getNotExpiredToken()
          .catch(reason => reject(reason))
          .then((token: string) => {
            this._stompClient.connectHeaders[Gd360StompService.AUTHORIZATION_HEADER] = token;
            resolve();
          });
      })
    });
  }
}
