import BaseModel from '../baseModel';
import CPromise from '../cPromise';
import genGuid from '../guid';
import { iterableBuffer, IterableBufferProducer } from '../iterableBuffer';
import Bus from './bus';

export function bus_request<T = any>(
    this: Bus,
    topic: string | [string, string],
    data?: any,
    options?: Partial<typeof this.options>,
): Promise<T>;
export function bus_request<T = any>(
    this: Bus,
    topic: string | [string, string],
    data?: any,
    options?: Partial<typeof this.options> & { baseModel: true },
): Promise<BaseModel<T>>;

export function bus_request<T = any>(
    this: Bus,
    topic: string | [string, string],
    data: any,
    {
        timeout = this.options.timeout,
        retry = this.options.retry,
        delay = this.options.delay || 0,
        backoff = this.options.backoff,
        maxDelay = this.options.maxDelay,
        debug = this.options.debug,
        baseModel: returnBaseModel = false,
    } = {},
) {
    let guid = genGuid();
    const correlationGuid = guid;
    timeout = Math.max(timeout, 0) || Bus.DEFAULT_TIMEOUT;

    const requestTopic = typeof topic === 'string' ? `${topic}.request` : topic[0];
    const responseTopic = typeof topic === 'string' ? `${topic}.response` : topic[1];

    const request = new CPromise<T | AsyncIterable<T>>(async (resolve, reject, onCancel) => {
        let timeoutId: NodeJS.Timeout | undefined;
        let producer: IterableBufferProducer<any> | undefined;

        const cancelListener = this.subscribe(
            responseTopic,
            (data, _topic, pkg) => {
                if (timeoutId !== undefined) {
                    clearTimeout(timeoutId);
                }

                if (pkg?.error) {
                    if (producer) producer.throw(pkg.error);
                    else reject(pkg.error);
                    cancelListener();
                } else if (pkg?.chunk !== undefined) {
                    const chunk = pkg.chunk;

                    if (!producer) {
                        let consumer;
                        [producer, consumer] = iterableBuffer<T>();
                        resolve(consumer);
                    }
                    producer.push(returnBaseModel ? pkg : data);
                    if (pkg.isLastChunk) {
                        producer.complete();
                        cancelListener();
                    } else if (timeout) {
                        timeoutId = setTimeout(() => {
                            cancelListener();
                            producer?.throw(Error(`Chunk ${chunk + 1} timed out.`));
                        }, timeout);
                    }
                } else {
                    resolve(returnBaseModel ? pkg : data);
                    cancelListener();
                }
            },
            { correlationGuid, debug },
        );

        const send = () => {
            this.publish(requestTopic, data ?? {}, { guid, correlationGuid, debug });
            if (timeout) {
                timeoutId = setTimeout(() => {
                    if (retry !== undefined && retry--) {
                        guid = genGuid();
                        timeoutId = setTimeout(send, delay);
                        if (backoff) {
                            delay *= 2;
                        }
                        if (maxDelay) {
                            delay = Math.min(delay, maxDelay);
                        }
                    } else {
                        cancelListener();
                        reject(Object.assign(new Error(`${responseTopic} timed out after ${timeout}ms`), { isTimeout: true }));
                        debug?.(`${responseTopic} timed out after ${timeout}ms`);
                    }
                }, timeout);
            }
        };
        send();

        onCancel(() => {
            cancelListener();
            if (timeoutId !== undefined) {
                clearTimeout(timeoutId);
            }
            if (producer) producer.throw(new Error('Request was canceled.'));
        });
    });

    return Object.assign(request, { guid, correlationGuid });
}
