import {
  buffer,
  combineLatestWith,
  debounceTime,
  distinctUntilChanged,
  filter,
  from,
  last,
  map,
  mergeMap,
  mergeScan,
  MonoTypeOperatorFunction,
  Observable,
  ObservableInput,
  OperatorFunction,
  pipe,
  scan,
  Subject,
  takeWhile,
  tap,
  timer,
  toArray,
  UnaryFunction,
} from "rxjs";
import { DEFAULT_BATCHER_DEBOUNCE_TIME, DEFAULT_INPUT_DEBOUNCE_TIME } from "../../constants/technical.constants";
import { getIsEveryItemTrue } from "../array";

// For input fields that need debounce & distinctUntilChanged but varying mapping pipes
export function generalInputPipe<T, U>(
  mappingPipe: UnaryFunction<Observable<T>, Observable<U>>,
): UnaryFunction<Observable<T>, Observable<U>> {
  return pipe(debounceTime(DEFAULT_INPUT_DEBOUNCE_TIME), mappingPipe, distinctUntilChanged());
}

// For number input fields that need distinct valid numbers
export function generalNumberInputPipe(): UnaryFunction<Observable<any>, Observable<number>> {
  return generalInputPipe(
    pipe(
      map((val) => Number(val)),
      filter((val) => Number.isFinite(val)),
    ),
  );
}

export const discretePercentageSanitizerPipe: UnaryFunction<Observable<string>, Observable<number>> = pipe(
  map((value) => value.replace(/\D/g, "").replace(/^0(\d+)/g, "$1")),
  map(Number),
  map((value) => (value > 100 ? 100 : value)),
);

export function arrayFilterPipe<T>(predicate: (value: T) => boolean): UnaryFunction<Observable<T[]>, Observable<T[]>> {
  return pipe(mergeMap((value) => from(value).pipe(filter(predicate), toArray())));
}

export function batcherPipe<T>(
  batcher$: Subject<T>,
  debounce = DEFAULT_BATCHER_DEBOUNCE_TIME,
  unique = false,
): UnaryFunction<Observable<T>, Observable<T[]>> {
  return pipe(
    buffer(batcher$.pipe(debounceTime(debounce))),
    map((values) => Array.from(unique ? new Set(values) : values)),
  );
}

function attemptsGuardFactory(maxAttempts: number): (_: number) => void {
  return (attemptsCount: number) => {
    if (attemptsCount > maxAttempts) {
      throw new Error("Exceeded maxAttempts");
    }
  };
}

/**
 * Inspired by https://medium.com/javascript-everyday/http-requests-polling-like-a-hero-with-rxjs-474a2e1fa057
 *
 * Note that this pipe uses a {@link timer} that starts emitting immediately
 * following subscription, so the consumer is responsible for triggering
 * the polling at the expected time.
 */
export function pollWhile<T>(
  pollInterval: number,
  isPollingActive: (res: T) => boolean,
  maxAttempts = Infinity,
  emitOnlyLast = false,
): MonoTypeOperatorFunction<T> {
  return (source$) => {
    const poll$ = timer(0, pollInterval).pipe(
      scan((attempts) => attempts + 1, 0),
      tap(attemptsGuardFactory(maxAttempts)),
      mergeMap(() => source$),
      takeWhile(isPollingActive, true),
    );

    return emitOnlyLast ? poll$.pipe(last()) : poll$;
  };
}

export function filterByPredicatesOrDefault<T, U>(
  predicates: Observable<boolean>[],
  defaultValue: U,
): UnaryFunction<Observable<T>, Observable<T | U>> {
  return pipe(
    combineLatestWith(...predicates),
    map(([sourceValue, ...predicateValues]) => (getIsEveryItemTrue(predicateValues) ? sourceValue : defaultValue)),
  );
}

export function filterByPredicatesOrUndefined<T>(
  predicates: Observable<boolean>[],
): UnaryFunction<Observable<T>, Observable<T | undefined>> {
  return filterByPredicatesOrDefault(predicates, undefined);
}

export function filterByPredicatesOrNull<T>(
  predicates: Observable<boolean>[],
): UnaryFunction<Observable<T>, Observable<T | null>> {
  return filterByPredicatesOrDefault(predicates, null);
}

export function filterByPredicates<T>(predicates: Observable<boolean>[]): MonoTypeOperatorFunction<T> {
  return pipe(
    combineLatestWith(...predicates),
    filter(([_, ...predicateValues]) => getIsEveryItemTrue(predicateValues)),
    map(([sourceValue]) => sourceValue),
  );
}

/**
 * mergeScan with concurrency 1
 */
export function concatScan<T, U>(accumulator: (acc: T, cv: U) => ObservableInput<T>, seed: T): OperatorFunction<U, T> {
  const concurrency = 1;
  return mergeScan(accumulator, seed, concurrency);
}

/**
 * concatScan with last
 */
export function concatReduce<T, U>(
  accumulator: (acc: T, cv: U) => ObservableInput<T>,
  seed: T,
): OperatorFunction<U, T> {
  return pipe(concatScan(accumulator, seed), last());
}
