import { cloneDeep } from "lodash-es";
import { Observable, of, race } from "rxjs";
import { distinctUntilChanged, map, shareReplay, take } from "rxjs/operators";
import { MappingFunction, Primitive } from "../types";
import { naiveObjectComparison, nonJsonStringifiableInstanceTypes, Nullable } from "../utils";

export const EXPECTED_SYNC_GOT_ASYNC_ERROR =
  " was provided with an asynchronous stream where a synchronous stream was expected.";

type MemoizationFunction<R> = (previousResult: R, currentResult: R) => boolean;

interface SelectOptions<T> {
  copy?: boolean;
  fallback?: T;
}

function protoMemoize<T>(inStream$: Observable<T>, memoizationFunction?: MemoizationFunction<T>): Observable<T> {
  return inStream$.pipe(distinctUntilChanged(memoizationFunction), shareReplay(1));
}

/**
 * used for publishing either rootObject$ or comination pipes that do not require mapping (primitives)
 */
export function memoize$<T extends Primitive | Nullable<Primitive>>(
  inStream$: Observable<T>,
  memoizationFunction?: MemoizationFunction<T>,
): Observable<T> {
  return protoMemoize(inStream$, memoizationFunction);
}

/**
 * used for publishing either rootObject$ or comination pipes that do not require mapping (objects & arrays)
 */
export function memoizeObject$<T extends object | Nullable<object>>(
  inStream$: Observable<T>,
  memoizationFunction?: MemoizationFunction<T>,
): Observable<T> {
  return protoMemoize(inStream$, memoizationFunction || naiveObjectComparison);
}

function protoSelect<T, R>(
  inStream$: Observable<T>,
  mappingFunction: MappingFunction<T, R>,
  memoizationFunction?: MemoizationFunction<R>,
): Observable<R> {
  return inStream$.pipe(map(mappingFunction), distinctUntilChanged(memoizationFunction), shareReplay(1));
}

/**
 * select<T, R> is a short-hand for selecting values from Observables (Subjects) on services
 * it removes boilerplate by allowing programmers to simply write 'select(inStream$, someRootObject => someRootObject.desiredProperty)
 * if memoization function is undefined, it simply uses the default distinctUntilChanged behavior ('===')
 * if declared, the function is used to determine whether a new value should be emitted or not based on comparison to previous result
 */
export function select$<T, R = Primitive>(
  inStream$: Observable<T>,
  mappingFunction: MappingFunction<T, R>,
  memoizationFunction?: MemoizationFunction<R>,
): Observable<R> {
  return protoSelect(inStream$, mappingFunction, memoizationFunction);
}

/*
 * short-hand method for select of Objects or Arrays where simple '===' does not work
 * the naive comparison might lead to some duplicate emissions leaking through, however
 * more complicated comparison functions might be too expensive to be justified using
 */
export function selectObject$<T, R>(
  inStream$: Observable<T>,
  mappingFunction: MappingFunction<T, R>,
  memoizationFunction?: MemoizationFunction<R>,
): Observable<R> {
  return protoSelect(inStream$, mappingFunction, memoizationFunction || naiveObjectComparison);
}

/**
 * Short-hand for synchronously selecting a value from a stream.
 * Throws an error if an asyonchronous stream is passed to the function.
 *
 * @param inStream$ the stream to select from
 * @param options
 * @param options.copy defines whether to return a copy of the value or not. Useful
 *                     when the value has reusable behavior (functions), but care
 *                     must be taken not to mutate the value
 * @param options.fallback the value to throw on error
 * @deprecated await firstValueFrom instead
 */
export function select<T>(inStream$: Observable<T>, options: SelectOptions<T> = {}): T {
  let value: Nullable<T> | Error;
  const opts = { copy: true, ...options };

  /*
   * Note that as of RxJS 6 throwError was made async and is no longer suited
   * to race other async streams in a sync function. As such, throwError()
   * was replaced with of(new Error()) and a corresponding instanceof test to
   * throw the error. Fingers crossed of() stays sync, or at least one other
   * creator function, or this synchronous select function may no longer
   * behave as expected.
   */
  race(inStream$, of(new Error(select.name + EXPECTED_SYNC_GOT_ASYNC_ERROR)))
    .pipe(take(1))
    .subscribe((val) => {
      // eslint-disable-next-line fp/no-mutation
      value = val;
    });

  if (value instanceof Error) {
    if (opts.fallback) {
      return opts.fallback;
    }

    throw value;
  }

  return typeof value === "object" &&
    !nonJsonStringifiableInstanceTypes.some((non) => value instanceof non) &&
    opts.copy
    ? cloneDeep(value as T)
    : (value as T);
}
