"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.EventSubscriber = void 0;
const types_1 = require("./types");
const txEventCache_1 = require("./txEventCache");
const eventList_1 = require("./eventList");
const pollingLogProvider_1 = require("./pollingLogProvider");
const fetchLogs_1 = require("./fetchLogs");
const webSocketLogProvider_1 = require("./webSocketLogProvider");
const events_1 = require("events");
const sort_1 = require("./sort");
const parse_1 = require("./parse");
const eventsServerLogProvider_1 = require("./eventsServerLogProvider");
class EventSubscriber {
    constructor(connection, program, options = types_1.DefaultEventSubscriptionOptions) {
        var _a;
        this.connection = connection;
        this.program = program;
        this.options = options;
        this.awaitTxPromises = new Map();
        this.awaitTxResolver = new Map();
        this.options = Object.assign({}, types_1.DefaultEventSubscriptionOptions, options);
        this.address = (_a = this.options.address) !== null && _a !== void 0 ? _a : program.programId;
        this.txEventCache = new txEventCache_1.TxEventCache(this.options.maxTx);
        this.eventListMap = new Map();
        this.eventEmitter = new events_1.EventEmitter();
        this.currentProviderType = this.options.logProviderConfig.type;
        this.initializeLogProvider();
    }
    initializeLogProvider(subscribe = false) {
        if (this.currentProviderType === 'websocket') {
            const logProviderConfig = this.options
                .logProviderConfig;
            this.logProvider = new webSocketLogProvider_1.WebSocketLogProvider(
            // @ts-ignore
            this.connection, this.address, this.options.commitment, logProviderConfig.resubTimeoutMs);
        }
        else if (this.currentProviderType === 'polling') {
            const logProviderConfig = this.options
                .logProviderConfig;
            this.logProvider = new pollingLogProvider_1.PollingLogProvider(
            // @ts-ignore
            this.connection, this.address, this.options.commitment, logProviderConfig.frequency, logProviderConfig.batchSize);
        }
        else if (this.currentProviderType === 'events-server') {
            const logProviderConfig = this.options
                .logProviderConfig;
            this.logProvider = new eventsServerLogProvider_1.EventsServerLogProvider(logProviderConfig.url, this.options.eventTypes, this.options.address ? this.options.address.toString() : undefined);
        }
        else {
            throw new Error(`Invalid log provider type: ${this.currentProviderType}`);
        }
        if (subscribe) {
            this.logProvider.subscribe((txSig, slot, logs, mostRecentBlockTime, txSigIndex) => {
                this.handleTxLogs(txSig, slot, logs, mostRecentBlockTime, this.currentProviderType === 'events-server', txSigIndex);
            }, true);
        }
    }
    populateInitialEventListMap() {
        for (const eventType of this.options.eventTypes) {
            this.eventListMap.set(eventType, new eventList_1.EventList(eventType, this.options.maxEventsPerType, (0, sort_1.getSortFn)(this.options.orderBy, this.options.orderDir), this.options.orderDir));
        }
    }
    /**
     * Implements fallback logic for reconnecting to LogProvider. Currently terminates at polling,
     * could be improved to try the original type again after some cooldown.
     */
    updateFallbackProviderType(reconnectAttempts, maxReconnectAttempts) {
        if (reconnectAttempts < maxReconnectAttempts) {
            return;
        }
        let nextProviderType = this.currentProviderType;
        if (this.currentProviderType === 'events-server') {
            nextProviderType = 'websocket';
        }
        else if (this.currentProviderType === 'websocket') {
            nextProviderType = 'polling';
        }
        else if (this.currentProviderType === 'polling') {
            nextProviderType = 'polling';
        }
        console.log(`EventSubscriber: Failing over providerType ${this.currentProviderType} to ${nextProviderType}`);
        this.currentProviderType = nextProviderType;
    }
    async subscribe() {
        try {
            if (this.logProvider.isSubscribed()) {
                return true;
            }
            this.populateInitialEventListMap();
            if (this.options.logProviderConfig.type === 'websocket' ||
                this.options.logProviderConfig.type === 'events-server') {
                const logProviderConfig = this.options
                    .logProviderConfig;
                if (this.logProvider.eventEmitter) {
                    this.logProvider.eventEmitter.on('reconnect', async (reconnectAttempts) => {
                        if (reconnectAttempts > logProviderConfig.maxReconnectAttempts) {
                            console.log(`EventSubscriber: Reconnect attempts ${reconnectAttempts}/${logProviderConfig.maxReconnectAttempts}, reconnecting...`);
                            this.logProvider.eventEmitter.removeAllListeners('reconnect');
                            await this.unsubscribe();
                            this.updateFallbackProviderType(reconnectAttempts, logProviderConfig.maxReconnectAttempts);
                            this.initializeLogProvider(true);
                        }
                    });
                }
            }
            this.logProvider.subscribe((txSig, slot, logs, mostRecentBlockTime, txSigIndex) => {
                this.handleTxLogs(txSig, slot, logs, mostRecentBlockTime, this.currentProviderType === 'events-server', txSigIndex);
            }, true);
            return true;
        }
        catch (e) {
            console.error('Error fetching previous txs in event subscriber');
            console.error(e);
            return false;
        }
    }
    handleTxLogs(txSig, slot, logs, mostRecentBlockTime, fromEventsServer = false, txSigIndex = undefined) {
        if (!fromEventsServer && this.txEventCache.has(txSig)) {
            return;
        }
        const wrappedEvents = this.parseEventsFromLogs(txSig, slot, logs, txSigIndex);
        for (const wrappedEvent of wrappedEvents) {
            this.eventListMap.get(wrappedEvent.eventType).insert(wrappedEvent);
        }
        // dont emit event till we've added all the events to the eventListMap
        for (const wrappedEvent of wrappedEvents) {
            this.eventEmitter.emit('newEvent', wrappedEvent);
        }
        if (this.awaitTxPromises.has(txSig)) {
            this.awaitTxPromises.delete(txSig);
            this.awaitTxResolver.get(txSig)();
            this.awaitTxResolver.delete(txSig);
        }
        if (!this.lastSeenSlot || slot > this.lastSeenSlot) {
            this.lastSeenTxSig = txSig;
            this.lastSeenSlot = slot;
        }
        if (this.lastSeenBlockTime === undefined ||
            mostRecentBlockTime > this.lastSeenBlockTime) {
            this.lastSeenBlockTime = mostRecentBlockTime;
        }
        this.txEventCache.add(txSig, wrappedEvents);
    }
    async fetchPreviousTx(fetchMax) {
        if (!this.options.untilTx && !fetchMax) {
            return;
        }
        let txFetched = 0;
        let beforeTx = undefined;
        const untilTx = this.options.untilTx;
        while (txFetched < this.options.maxTx) {
            const response = await (0, fetchLogs_1.fetchLogs)(
            // @ts-ignore
            this.connection, this.address, this.options.commitment === 'finalized' ? 'finalized' : 'confirmed', beforeTx, untilTx);
            if (response === undefined) {
                break;
            }
            txFetched += response.transactionLogs.length;
            beforeTx = response.earliestTx;
            for (const { txSig, slot, logs } of response.transactionLogs) {
                this.handleTxLogs(txSig, slot, logs, response.mostRecentBlockTime);
            }
        }
    }
    async unsubscribe() {
        this.eventListMap.clear();
        this.txEventCache.clear();
        this.awaitTxPromises.clear();
        this.awaitTxResolver.clear();
        return await this.logProvider.unsubscribe(true);
    }
    parseEventsFromLogs(txSig, slot, logs, txSigIndex) {
        const records = [];
        // @ts-ignore
        const events = (0, parse_1.parseLogs)(this.program, logs);
        let runningEventIndex = 0;
        for (const event of events) {
            // @ts-ignore
            const expectRecordType = this.eventListMap.has(event.name);
            if (expectRecordType) {
                event.data.txSig = txSig;
                event.data.slot = slot;
                event.data.eventType = event.name;
                event.data.txSigIndex =
                    txSigIndex !== undefined ? txSigIndex : runningEventIndex;
                records.push(event.data);
            }
            runningEventIndex++;
        }
        return records;
    }
    awaitTx(txSig) {
        if (this.awaitTxPromises.has(txSig)) {
            return this.awaitTxPromises.get(txSig);
        }
        if (this.txEventCache.has(txSig)) {
            return Promise.resolve();
        }
        const promise = new Promise((resolve) => {
            this.awaitTxResolver.set(txSig, resolve);
        });
        this.awaitTxPromises.set(txSig, promise);
        return promise;
    }
    getEventList(eventType) {
        return this.eventListMap.get(eventType);
    }
    /**
     * This requires the EventList be cast to an array, which requires reallocation of memory.
     * Would bias to using getEventList over getEvents
     *
     * @param eventType
     */
    getEventsArray(eventType) {
        return this.eventListMap.get(eventType).toArray();
    }
    getEventsByTx(txSig) {
        return this.txEventCache.get(txSig);
    }
}
exports.EventSubscriber = EventSubscriber;
