import { nanoid } from 'nanoid';
import {
  filter,
  fromPromise,
  fromValue,
  map,
  mergeMap,
  pipe,
  type Source,
  subscribe,
  take,
  toPromise,
} from 'wonka';

import { isNotNull } from './predicates';
import { type Accumulator, fromWindowEvent, makeAccumulator } from './wonka';

export function passPortTo(wnd: Window, origin: string, port: MessagePort) {
  wnd.postMessage({ keeperWebMessagePort: port }, origin, [port]);
}

export function acceptPortFrom(origin: string) {
  return pipe(
    fromWindowEvent(window, 'message'),
    filter(event => origin === '*' || event.origin === origin),
    map((event: MessageEvent<unknown>) =>
      event.data != null &&
      typeof event.data === 'object' &&
      'keeperWebMessagePort' in event.data &&
      event.data.keeperWebMessagePort instanceof MessagePort
        ? event.data.keeperWebMessagePort
        : null
    ),
    filter(isNotNull),
    take(1),
    toPromise
  );
}

interface RpcRequest<F = string, A = unknown[]> {
  id: string;
  fn: F;
  args: A;
}

function isRpcRequest(input: unknown): input is RpcRequest {
  return (
    typeof input === 'object' &&
    input != null &&
    'id' in input &&
    typeof input.id === 'string' &&
    'fn' in input &&
    typeof input.fn === 'string' &&
    'args' in input &&
    Array.isArray(input.args)
  );
}

type RpcResponse<T> = { id: string } & (
  | { result: T }
  | { error: { message: string } }
);

function isRpcResponse(input: unknown): input is RpcResponse<unknown> {
  if (
    typeof input !== 'object' ||
    input == null ||
    !('id' in input) ||
    typeof input.id !== 'string'
  ) {
    return false;
  }

  if ('result' in input) {
    return true;
  }

  return (
    'error' in input &&
    typeof input.error === 'object' &&
    input.error !== null &&
    'message' in input.error &&
    typeof input.error.message === 'string'
  );
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ApiInput = Record<string, (...args: any[]) => Promise<unknown>>;

export type AccumulatedRpcRequest<T extends ApiInput, K extends keyof T> = {
  args: Parameters<T[K]>;
  id: string;
  reject: (reason?: unknown) => void;
  resolve: (value: Awaited<ReturnType<T[K]>>) => void;
};

export type AccumulatedRpcRequests<T extends ApiInput> = {
  [K in keyof T]: Source<AccumulatedRpcRequest<T, K>>;
};

function createLazyRecord<T extends Record<string, unknown>>(
  get: <K extends keyof T>(p: K) => T[K]
) {
  const obj: Partial<T> = {};

  return new Proxy(obj, {
    get<P extends Extract<keyof T, string>>(target: T, p: P) {
      if (!target[p]) {
        target[p] = get(p);
      }

      return target[p];
    },
  }) as T;
}

export function createRpcRequestHandler<T extends ApiInput>(
  createApi: (helpers: {
    accumulate: <A extends unknown[], R>() => (
      request: RpcRequest<Extract<keyof T, string>, A>
    ) => Promise<R>;
    callAsync: <A extends unknown[], R>(
      fn: (...args: A) => Promise<R>
    ) => (request: RpcRequest<Extract<keyof T, string>, A>) => Promise<R>;
  }) => {
    [K in Extract<keyof T, string>]: (
      request: RpcRequest<K, Parameters<T[K]>>
    ) => Promise<Awaited<ReturnType<T[K]>>>;
  }
) {
  type TK = Extract<keyof T, string>;

  const accumulators =
    createLazyRecord<{ [K in TK]: Accumulator<AccumulatedRpcRequest<T, K>> }>(
      makeAccumulator
    );

  const api = createApi({
    accumulate: () => request =>
      new Promise((resolve, reject) => {
        const { args, fn, id } = request;

        accumulators[fn].sink(
          fromValue({
            args,
            id,
            reject,
            resolve,
          } as unknown as AccumulatedRpcRequest<T, TK>)
        );
      }),
    callAsync: fn => request => fn(...request.args),
  });

  return {
    accumulatedRequests: createLazyRecord<{
      [K in TK]: Source<AccumulatedRpcRequest<T, K>>;
    }>(p => accumulators[p].source),
    handler: (source: Source<unknown>) =>
      pipe(
        source,
        filter(isRpcRequest),
        mergeMap(input =>
          fromPromise(
            Promise.resolve().then(async () => {
              try {
                const fn = api[input.fn];

                if (!fn) {
                  throw new Error(
                    `There's no "${input.fn}" function in rpc api`
                  );
                }

                return {
                  id: input.id,
                  result: await fn(input as RpcRequest<TK, Parameters<T[TK]>>),
                };
              } catch (err) {
                return {
                  id: input.id,
                  error:
                    err && typeof err === 'object'
                      ? {
                          ...err,
                          message: String('message' in err ? err.message : err),
                        }
                      : { message: String(err) },
                };
              }
            })
          )
        )
      ),
  };
}

export function createRpcRequestProxy<T extends ApiInput>() {
  const api: Partial<T> = {};
  const requestAccumulator = makeAccumulator<RpcRequest>();
  const responseAccumulator = makeAccumulator<RpcResponse<unknown>>();

  return {
    api: new Proxy(api, {
      get<K extends Extract<keyof T, string>>(target: T, fn: K) {
        // NOTE: this is needed to make sure proxy is not treated as a promise
        if (fn === 'then') return undefined;

        if (!target[fn]) {
          target[fn] = (async (...args) => {
            const id = nanoid();

            requestAccumulator.sink(fromValue({ id, fn, args }));

            return new Promise((resolve, reject) => {
              pipe(
                responseAccumulator.source,
                filter(response => response.id === id),
                take(1),
                subscribe(response => {
                  if ('result' in response) {
                    resolve(response.result);
                  } else {
                    reject(response.error);
                  }
                })
              );
            });
          }) as T[K];
        }

        return target[fn];
      },
    }) as T,
    proxy: (source: Source<unknown>) =>
      pipe(source, filter(isRpcResponse), src => sink => {
        src(signal => {
          if (signal === 0) {
            sink(signal);
          } else if (signal.tag === 0) {
            sink(signal);
            requestAccumulator.source(sink);
          } else {
            responseAccumulator.sink(fromValue(signal[0]));
          }
        });
      }),
  };
}
