import { Observable, OperatorFunction, Subscription } from "rxjs";
import { debounceTime, filter, finalize, tap } from "rxjs/operators";
import { RUNNING_JOBS_WARNING_DEBOUNCE_TIME } from "src/app/constants/technical.constants";
import { LongRunningJobWarning } from "src/app/utils/errors";
import { Monitoring } from "src/app/utils/monitoring";
import { BehaviorStore } from "src/app/utils/rxjs/store";

export abstract class AbstractRunningJobsService {
  private readonly runningJobsStore$ = new BehaviorStore<string[]>([]);
  // eslint-disable-next-line @typescript-eslint/member-ordering
  public readonly runningJobs$ = this.runningJobsStore$.asObservable();

  private longRunningJobWatcher$: Subscription | undefined;

  protected constructor() {
    this.startLongRunningJobWatcher(this.runningJobs$);
  }

  public push(key: string): void {
    const runningJobs = [...this.runningJobsStore$.getValue(), key];
    this.runningJobsStore$.next(runningJobs);
  }

  public pop(key: string): void {
    const runningJobs = this.runningJobsStore$.getValue().filter((runningJob) => runningJob !== key);

    this.runningJobsStore$.next(runningJobs);
  }

  public forceStop(): void {
    this.runningJobsStore$.next([]);
  }

  public stopLongRunningJobWatcher(): void {
    this.longRunningJobWatcher$?.unsubscribe();
  }

  public withLoader =
    <T>(key: string): OperatorFunction<T, T> =>
    (source$: Observable<T>): Observable<T> =>
      new Observable((subscriber) => {
        const uniqueKey = `${key}:${+new Date()}`;
        this.push(uniqueKey);

        const sourceSubscription$ = source$
          .pipe(
            finalize(() => {
              this.pop(uniqueKey);
            }),
          )
          .subscribe({
            next: (value) => {
              subscriber.next(value);
            },
            error: (err) => {
              subscriber.error(err);
            },
            complete: () => {
              subscriber.complete();
            },
          });

        return (): void => {
          sourceSubscription$.unsubscribe();
        };
      });

  protected startLongRunningJobWatcher(source$: Observable<string[]>): void {
    if (this.longRunningJobWatcher$) {
      this.stopLongRunningJobWatcher();
    }

    this.longRunningJobWatcher$ = source$
      .pipe(
        debounceTime(RUNNING_JOBS_WARNING_DEBOUNCE_TIME),
        filter((runningJobs) => runningJobs.length > 0),
        tap((runningJobs) => {
          Monitoring.warn(new LongRunningJobWarning(), {
            extras: {
              stack: runningJobs,
              debounceTime: RUNNING_JOBS_WARNING_DEBOUNCE_TIME,
            },
          });
        }),
      )
      .subscribe();
  }
}
