//@ts-ignore
import notifierFind from '@absinthe/socket/dist/notifier/find';
import {
  AbsintheSocket,
  Notifier,
  observe,
  Observer,
  send,
  unobserveOrCancel,
} from '@absinthe/socket';
//@ts-ignore
import { getOperationType } from '@jumpn/utils-graphql';
import {
  Observable,
  CacheConfig,
  GraphQLResponse,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { Sink, Subscription } from 'relay-runtime/lib/network/RelayObservable';

type Deferred<Result> = {
  promise: Promise<Result>;
  resolve: (result: Result) => void;
  reject: (error: any) => void;
};

const unobserveOrCancelIfNeeded = (
  absintheSocket: AbsintheSocket,
  notifier: Notifier,
  observer: Observer
) => {
  if (notifier) {
    unobserveOrCancel(absintheSocket, notifier, observer);
  }
};

const createDisposable = (
  absintheSocket: AbsintheSocket,
  { request }: Notifier,
  observer: Observer<Variables, GraphQLResponse>
) => ({
  dispose: () =>
    unobserveOrCancelIfNeeded(
      absintheSocket,
      notifierFind(absintheSocket.notifiers, 'request', request),
      observer as any
    ),
});

/**
 * Creates a Subscriber (Relay SubscribeFunction) using the given AbsintheSocket
 * instance
 */
const createSubscriber = (absintheSocket: AbsintheSocket) => (
  { text: operation }: RequestParameters,
  variables: Variables,
  cacheConfig: CacheConfig
): Observable<GraphQLResponse> => {
  if (!operation || getOperationType(operation) !== 'subscription') {
    throw new Error(`Expected subscription, but instead got:\n${operation}`);
  }

  const notifier = send(absintheSocket, { operation, variables });

  const relaySource = (sink: Sink<GraphQLResponse>) => {
    const absintheObserver = {
      onAbort: (e) => console.log('Subscription aborted, ', e),
      onError: (e) => {
        console.log('Error during subscription ', e);
        sink.error(e);
      },
      onStart: (n) => console.log('Subscription started ', n),
      onResult: (v) => {
        console.log('Got result ', v);
        sink.next(v);
      },
    } as Observer<Variables, GraphQLResponse>;
    observe(absintheSocket, notifier, absintheObserver as any);
    const disposable = createDisposable(absintheSocket, notifier, absintheObserver);
    const subscription = {
      unsubscribe: () => {
        disposable.dispose();
      },
      closed: false,
    } as Subscription;
    return subscription;
  };
  return Observable.create(relaySource);
};

export default createSubscriber;
