import { Injectable } from '@angular/core';
import { HOST, LOG_TYPE, PROGRESS_STATE } from '../../consts';
import { BehaviorSubject, delay, EMPTY, Observable, Subject, switchMap, timer } from 'rxjs';
import { CookieService } from 'ngx-cookie-service';
import { AuthenticationService } from '../common/authentication.service';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class LogService {
  private _resourceStream: { [key: string]: any } = {};
  private _controllers: { [key: string]: AbortController } = {};
  private _eventStreamSubject$: Subject<{ logType: number; log: any }> = new Subject<{
    logType: number;
    log: string;
  }>();

  constructor(private cookieService: CookieService, private http: HttpClient) {
    this._resourceStream[LOG_TYPE.ACCESS] = null;
    this._resourceStream[LOG_TYPE.MASTER] = null;
    this._resourceStream[LOG_TYPE.SCHEDULER] = null;
  }

  async getInitLogList(
    logType: number,
    line: number,
    search?: string,
    follow?: string,
    http?: HttpClient
  ): Promise<void> {
    let logTypeString = '';
    switch (logType) {
      case 0:
        logTypeString = 'access';
        break;
      case 1:
        logTypeString = 'master';
        break;
      case 2:
        logTypeString = 'scheduler';
        break;
    }

    const token = this.cookieService.get('token') || this.cookieService.get('refresh_token');
    const ctrl = new AbortController();
    this._controllers[logType] = ctrl; // Store the controller

    this._resourceStream[logType] = fetchEventSource(
      `${HOST.LOG_STREAM_LIST}/${Date.now()}?command=tail&log_type=${logTypeString}&line=${line}${
        search ? `&search=${search}` : ''
      }${follow ? `&follow=${follow}` : ''}`,
      {
        method: 'GET',
        headers: {
          Authorization: token,
        },
        signal: ctrl.signal,
        async onopen(response) {
          if (response.status >= 400 && response.status <= 502) {
            http
              .get(
                `${HOST.LOG_STREAM_LIST}/${Date.now()}?command=tail&log_type=${logTypeString}&line=${line}${
                  search ? `&search=${search}` : ''
                }${follow ? `&follow=${follow}` : ''}`,
                {
                  observe: 'events',
                  responseType: 'text',
                  reportProgress: true,
                  headers: {
                    Accept: 'text/event-stream',
                  },
                }
              )
              .subscribe((el: any) => {});
          }
        },
        onmessage: (event) => {
          const newMessage = event.data;
          this._eventStreamSubject$.next({
            logType,
            log: newMessage,
          });
        },
        onerror: (err) => {
          throw new Error(err);
        },
        openWhenHidden: true,
      }
    );
  }

  getEventStreamSubject(): Subject<{ logType: number; log: any }> {
    return this._eventStreamSubject$;
  }

  closeStream(logType: number) {
    if (this._controllers[logType]) {
      this._controllers[logType].abort(); // Abort the SSE connection
      delete this._controllers[logType];
    }
    if (this._resourceStream[logType]) {
      delete this._resourceStream[logType];
    }
  }
}
