import type { Observable } from "rxjs";
import {
  filter,
  map,
  switchMap,
  merge,
  interval,
  take,
  race,
  of,
  catchError,
  debounceTime,
  share,
  takeUntil,
  throttle,
  from,
} from "rxjs";
import type { queryRun } from "@fscrypto/domain";
import { GET, POST } from "~/async/fetch";
import { actorSystem } from "~/state/system";
import type { Ref } from "./query-run.machine";
import { ofType } from "~/state/epics";
import { compass } from "@fscrypto/compass";
import invariant from "tiny-invariant";
import { zipObject } from "lodash-es";
import { Actor as QueryActorRef } from "~/features/query/state/query/actor";

export type Event =
  | CreateQueryRun
  | CreateQueryRunSuccess
  | CreateQueryRunError
  | PollQueryRun
  | PollQueryRunSuccess
  | PollQueryRunFailure
  | PollQueryRunStatus
  | CancelQueryRun
  | CancelQueryRunSuccess
  | FetchResults
  | FetchResultsSuccess;

type CreateQueryRun = { type: "QUERY_RUN.EPIC.CREATE"; payload: { queryId: string } };
type CreateQueryRunSuccess = { type: "QUERY_RUN.EPIC.CREATE_SUCCESS"; payload: queryRun.QueryRun };
type CreateQueryRunError = { type: "QUERY_RUN.EPIC.CREATE_ERROR"; error: string };
type PollQueryRun = { type: "QUERY_RUN.EPIC.POLL"; payload: { queryRunId: string } };
type PollQueryRunStatus = { type: "QUERY_RUN.EPIC.POLL_STATUS"; payload: queryRun.QueryRun };
type PollQueryRunSuccess = { type: "QUERY_RUN.EPIC.POLL_SUCCESS"; payload: queryRun.QueryRun };
type PollQueryRunFailure = { type: "QUERY_RUN.EPIC.POLL_FAILURE"; payload: queryRun.QueryRun };
type CancelQueryRun = { type: "QUERY_RUN.EPIC.CANCEL"; payload: { queryRunId: string } };
type CancelQueryRunSuccess = { type: "QUERY_RUN.EPIC.CANCEL_SUCCESS"; payload: queryRun.QueryRun };
type FetchResults = { type: "QUERY_RUN.EPIC.FETCH_RESULTS"; payload: { queryId: string } };
type FetchResultsSuccess = { type: "QUERY_RUN.EPIC.FETCH_RESULTS_SUCCESS"; payload: queryRun.QueryRunResult };

/**
 * Merges multiple epic functions into a single Observable<Event>.
 * Merging these epics into one observable allows using a single subject for capturing all events.
 */
export const createEpics = (actions$: Observable<Event>): Observable<Event> => {
  return merge(
    createQueryRunEpic(actions$),
    pollQueryRunEpic(actions$),
    cancelQueryRunEpic(actions$),
    fetchResultsEpic(actions$),
  );
};

/**
 * Epic to create a query run
 * @param {Observable<Event>} action$ - Stream of actions to listen to and dispatch other actions.
 * @returns {Observable<Event>} - Returns a stream of actions.
 */
const createQueryRunEpic = (action$: Observable<Event>) => {
  return action$.pipe(
    // filters actions related to query run creation
    ofType("QUERY_RUN.EPIC.CREATE"),
    // applies a delay between each successive emission
    debounceTime(500),
    // transforms each action into a set of query and queryRun refs
    map((a) => {
      const query = actorSystem.get<QueryActorRef>(`query-${a.payload.queryId}-root`);
      const queryRun = actorSystem.get<Ref>(`queryRun-${a.payload.queryId}`);
      return { query, queryRun, queryId: a.payload.queryId };
    }),
    // attempts to execute query run and transform the result into a new action
    switchMap(({ query, queryRun, queryId }) => {
      const statement =
        query?.getSnapshot()?.context.query?.statement ?? queryRun?.getSnapshot()?.context.statement ?? "";
      return from(executeQueryRun(statement, queryId)).pipe(
        map((r) => ({ type: "QUERY_RUN.EPIC.CREATE_SUCCESS", payload: r }) as CreateQueryRunSuccess),
        catchError((e) =>
          of({ type: "QUERY_RUN.EPIC.CREATE_ERROR", error: e.message ?? "unknown error" } as CreateQueryRunError),
        ),
      );
    }),
  ) as Observable<Event>;
};

/**
 * This epic handles the polling of query runs.
 * It waits for a "QUERY_RUN.EPIC.POLL" event and then polls the state of a query run.
 * @param {Observable<Event>} action$ - An observable of events.
 * @returns {Observable<Event>} - An observable of the result of the poll.
 */
const pollQueryRunEpic = (action$: Observable<Event>) => {
  return race(
    action$.pipe(
      // Only process poll events
      ofType("QUERY_RUN.EPIC.POLL"),
      // Switch to an inner observable that polls the query run every second
      switchMap((a) => {
        const poll$ = interval(1000).pipe(
          // Throttle the poll to slow down over time
          throttle((x) => interval(Math.min(500 * x, 5000))),
          // Fetch the state of the query run
          switchMap(() => fetchQueryRun(a.payload.queryRunId)),
          // Share the result with all subscribers
          share(),
        );
        // Create an observable that emits once the query run is finished
        const finished$ = poll$.pipe(
          filter((q) => q.status === "canceled" || q.status === "failed" || (q.status === "finished" && !!q.s3Results)),
          take(1),
          share(),
        );
        // Map failed query runs to failure events
        const failure$ = finished$.pipe(
          filter((q) => q.status === "failed"),
          map((q) => ({ type: "QUERY_RUN.EPIC.POLL_FAILURE", payload: q }) as PollQueryRunFailure),
        );
        // Map successful query runs to success events
        const success$ = finished$.pipe(
          filter((q) => q.status === "finished"),
          map((q) => ({ type: "QUERY_RUN.EPIC.POLL_SUCCESS", payload: q }) as PollQueryRunSuccess),
        );
        // Map all query runs to status events until the query run is finished
        const status$ = poll$.pipe(
          map((q) => ({ type: "QUERY_RUN.EPIC.POLL_STATUS", payload: q }) as PollQueryRunStatus),
          takeUntil(finished$),
        );
        return merge(status$, failure$, success$);
      }),
    ),
    // Stop polling if a cancel event is received
    action$.pipe(ofType("QUERY_RUN.EPIC.CANCEL")),
  ) as Observable<Event>;
};

/**
 * This epic handles the cancellation of query runs.
 * It waits for a "QUERY_RUN.EPIC.CANCEL" event and then cancels the specified query run.
 * @param {Observable<Event>} action$ - An observable of events.
 * @returns {Observable<Event>} - An observable of the result of the cancellation.
 */
const cancelQueryRunEpic = (action$: Observable<Event>) => {
  return action$.pipe(
    // Only process cancel events
    ofType("QUERY_RUN.EPIC.CANCEL"),
    // For each cancel event, switch to an observable that performs the cancellation
    switchMap((a) => cancelRun(a.payload.queryRunId)),
    // Map the result of the cancellation to a success event
    map((r) => ({ type: "QUERY_RUN.EPIC.CANCEL_SUCCESS", payload: r }) as CancelQueryRunSuccess),
  ) as Observable<Event>;
};

/**
 * This epic handles fetching the results of a query run.
 * It waits for a "QUERY_RUN.EPIC.FETCH_RESULTS" event and then fetches the results.
 * @param {Observable<Event>} action$ - An observable of events.
 * @returns {Observable<Event>} - An observable of the result of fetching the results.
 */
const fetchResultsEpic = (action$: Observable<Event>) => {
  return action$.pipe(
    // Only process fetch results events
    ofType("QUERY_RUN.EPIC.FETCH_RESULTS"),
    // For each fetch results event, switch to an observable that fetches the results
    switchMap((a) => fetchQueryRunResult(a.payload.queryId)),
    // Map the result of the fetch to a success event
    map((r) => ({ type: "QUERY_RUN.EPIC.FETCH_RESULTS_SUCCESS", payload: r }) as FetchResultsSuccess),
  ) as Observable<Event>;
};

// ASYNC API Calls

const executeQueryRun = async (statement: string, queryId: string) => {
  try {
    const data = await POST<{ result: queryRun.QueryRun }>(`/api/queries/${queryId}/execute`, {
      statement,
    });
    return data.result;
  } catch (e) {
    let message = "An error has occurred.";
    if (e instanceof Error) message = e.message;
    throw new Error(message);
  }
};

const fetchQueryRun = async (queryRunId: string) => {
  try {
    return await GET<queryRun.QueryRun>(`/api/query-runs/${queryRunId}`);
  } catch (e) {
    throw new Error(`Error Fetching Query Run Data: ${e}`);
  }
};

const cancelRun = async (queryRunId: string) => {
  try {
    const data = await POST<{ result: queryRun.QueryRun }>(`/api/query-runs/${queryRunId}/cancel`);
    return data.result;
  } catch (e) {
    throw new Error("Error Cancelling Query Run");
  }
};

export const fetchQueryRunResult = async (queryId: string): Promise<queryRun.QueryRunResult> => {
  const resp = await fetch(`/api/queries/${queryId}/latest-run`);
  if (!resp.ok) {
    return {
      columns: [],
      csvData: [],
      jsonData: [],
      endedAt: new Date(),
      errorMessage: "An error has occurred",
      rowCount: 0,
      startedAt: new Date(),
      status: "failed",
      types: [],
      queryId,
      queryRunId: "",
      resultsRetrieved: false,
      executionType: undefined,
    };
  }

  const isCompass = resp.headers.get("x-execution-engine") !== "graphql";
  const data = await resp.json();

  if (!isCompass) return data;

  const compassResp = data as compass.GetQueryRunResultsResponseType;

  if (compassResp.error) {
    return {
      columns: [],
      csvData: [],
      jsonData: [],
      endedAt: new Date(),
      errorMessage: compassResp.error.message,
      rowCount: 0,
      startedAt: new Date(),
      status: "failed",
      types: [],
      queryId,
      queryRunId: "",
      resultsRetrieved: false,
      executionType: undefined,
    };
  }

  invariant(compassResp.result, "getQueryRunResultsResponse.result is required when there's no error");

  const csvData = compassResp.result.rows as queryRun.CsvRowsSchema;
  const columns = compassResp.result.columnNames;

  const result: queryRun.QueryRunResult = {
    columns,
    csvData,
    jsonData: csvData.map((row) => zipObject(columns ?? [], row)) ?? [],
    types: compassResp.result.columnTypes ?? [],
    rowCount: compassResp.result.rows.length ?? 0,
    errorMessage: "",
    status: "finished",
    startedAt: new Date(compassResp.result.originalQueryRun.startedAt ?? 0),
    endedAt: new Date(compassResp.result.originalQueryRun.endedAt ?? 0),
    queryId,
    queryRunId: "",
    resultsRetrieved: true,
    executionType: undefined,
  };

  return result;
};
