import {
  Observable,
  Subject,
  Subscription,
  interval,
  switchMap,
  throttle,
  share,
  filter,
  take,
  merge,
  takeUntil,
  race,
} from "rxjs";
import { queryRun } from "@fscrypto/domain";
import { QueryClient } from "./query-client";

export class QueryPoller {
  #result$$ = new Subject<queryRun.QueryRunResult>();
  #cancel$$ = new Subject<void>();
  #poller$: Observable<queryRun.QueryRunResult | void>;
  #subscription: Subscription | null = null;
  public results$ = this.#result$$.asObservable();
  constructor(
    public queryRunId: string,
    private readonly client: QueryClient,
  ) {
    this.#poller$ = this.#createQueryPoller$();
  }

  async poll() {
    this.#subscription = this.#poller$.subscribe((result) => result && this.#result$$.next(result));
  }

  async cancel() {
    this.#cancel$$.next();
    this.#subscription?.unsubscribe();
    this.#subscription = null;
    const result = await this.client.cancel(this.queryRunId);
    this.#result$$.next(result);
  }

  #createQueryPoller$() {
    const poll$ = interval(1000).pipe(
      // Throttle the poll to slow down over time
      throttle((x) => interval(Math.min(500 * x, 5000))),
      // Fetch the state of the query run
      switchMap(() => this.client.fetchStatus(this.queryRunId)),
      // Share the result with all subscribers
      share(),
    );
    // Create an observable that emits once the query run is finished
    const finished$ = poll$.pipe(
      filter((q) => q.status === "canceled" || q.status === "failed" || q.status === "finished"),
      take(1),
      share(),
    );
    // Map failed query runs to failure events
    const failure$ = finished$.pipe(filter((q) => q.status === "failed"));

    // Map failed query runs to failure events
    const cancel$ = finished$.pipe(filter((q) => q.status === "canceled"));

    // Map successful query runs to success events
    const success$ = finished$.pipe(filter((q) => q.status === "finished"));
    const result$ = success$.pipe(switchMap(() => this.client.fetchResults(this.queryRunId)));
    // Map all query runs to status events until the query run is finished
    const status$ = poll$.pipe(takeUntil(finished$));
    return race(this.#cancel$$, merge(status$, failure$, success$, cancel$, result$));
  }
}
