connect.js 2.91 KB
const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.CLIENT.CONNECT';

const logger = require('tektrans-logger');
const { WebSocket } = require('ws');
const uniqid = require('uniqid');

const defaultValues = require('../default-values');
const wsList = require('./ws-list');
const heartbeatPing = require('../heartbeat-ping');
const onMessage = require('./on-message');
const onPong = require('../on-pong');
const onPing = require('../on-ping');

const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
const RECONNECT_DELAY_MS = 1000;

/**
 * @param  {string} xid
 * @param  {object} gateway
 * @param  {string} gateway.name
 * @param  {string} gateway.url
 * @param  {string} gateway.apikey
 * @param  {boolean} [gateway.disabled]
 */
const connect = (xid, gateway) => {
    let pingInterval;

    const {
        name: gwNameOriginal, url: gwUrl, apikey, disabled,
    } = gateway;

    const gwName = gwNameOriginal.trim().toUpperCase();

    if (disabled) {
        logger.verbose(`${MODULE_NAME} 845F46E1: Skip disabled gateway`, { xid, gwName });
        wsList[gwName] = null;
        return;
    }

    if (!apikey) {
        logger.warn(`${MODULE_NAME} A61046ED: Missing apikey`, { xid, gwName });
        wsList[gwName] = null;
        return;
    }

    logger.info(`${MODULE_NAME} 7A7095BD: Connecting`, {
        xid, gwName, gwUrl,
    });

    const ws = new WebSocket(gwUrl, {
        headers: {
            apikey,
        },
    });

    wsList[gwName] = ws;

    ws.on('open', () => {
        logger.verbose(`${MODULE_NAME} D7B07324: Gateway websocket connected`, {
            xid,
            gwName,
        });

        ws.isAlive = true;
        ws.peerName = gwName;

        logger.verbose(`${MODULE_NAME} DA2A43ED: Registering ping interval`, { xid, gwName });
        pingInterval = setInterval(() => {
            heartbeatPing(ws);
        }, PING_INTERVAL_MS);
    });

    ws.on('close', (code, reason) => {
        const closeXid = uniqid();

        logger.info(`${MODULE_NAME} E7A67DAA: Closed`, {
            xid: closeXid,
            gwName,
            code,
            reason: reason.toString(),
        });

        if (pingInterval) {
            logger.verbose(`${MODULE_NAME} E398741C: Unregister ping`, {
                xid: closeXid,
            });

            clearInterval(pingInterval);
        }

        logger.verbose(`${MODULE_NAME} 245B84CB: Reconnecting`, {
            xid,
            delayMs: RECONNECT_DELAY_MS,
        });

        wsList[gwName] = null;

        setTimeout(connect, RECONNECT_DELAY_MS, closeXid, gateway);
    });

    ws.on('ping', (data) => onPing(ws, data));
    ws.on('pong', (data) => onPong(ws, data));

    ws.on('error', (err) => {
        logger.warn(`${MODULE_NAME} 0E521FF2: Error`, {
            gwName,
            eCode: err.code,
            eMessage: err.message,
        });
    });

    ws.on('message', (data) => {
        onMessage(gwName, ws, data);
    });
};

module.exports = connect;