server.js 6.76 KB
const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';

const config = require('komodo-sdk/config');
const logger = require('tektrans-logger');

const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const uniqid = require('uniqid');

const packageJson = require('./package.json');

const defaultValues = require('./lib/default-values');
const isValidApikey = require('./lib/is-valid-apikey');
const rejectConnection = require('./lib/server/reject-connection');
const heartbeatPing = require('./lib/heartbeat-ping');
const onPing = require('./lib/on-ping');
const onPong = require('./lib/on-pong');

const PING_INTERVAL_MS = defaultValues.pingIntervalMs;

const server = createServer();

let partner;

exports.setPartner = (partnerFromCaller) => {
    logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
    partner = partnerFromCaller;
};

const wsListenPort = config.push_trx_server
    && (config.push_trx_server.port || config.push_trx_server.listen_port);

const initServer = () => {
    const pushTrxSdkVersion = packageJson.version;
    logger.verbose(`${MODULE_NAME} 70D208B2: Initializing`, { pushTrxSdkVersion });

    const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });

    wss.on('connection', (ws, req, client) => {
        // eslint-disable-next-line no-param-reassign
        ws.isAlive = true;

        const {
            xid: connectionXid,
            remoteAddress,
            apikey,
            apikeyName,
        } = client;

        logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
            xid: connectionXid || uniqid(),
            client,
        });

        const heatbeatPingInterval = setInterval(() => {
            heartbeatPing(ws);
        }, PING_INTERVAL_MS);

        ws.on('ping', (data) => onPing(ws, data));
        ws.on('pong', (data) => onPong(ws, data));

        ws.on('close', (code, reason) => {
            const xid = uniqid();

            logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
                xid,
                remoteAddress,
                apikeyName,
                apikey,
                code,
                reason: reason.toString(),
            });

            logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
                xid,
            });
            clearInterval(heatbeatPingInterval);
        });

        ws.on('message', (data) => {
            const xid = uniqid();

            // eslint-disable-next-line no-param-reassign
            ws.isAlive = true;

            let msg;
            try {
                msg = JSON.parse(data.toString() || '');
            } catch (e) {
                msg = data.toString();
            }

            if (!msg) return;

            const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();

            logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
                xid,
                apikeyName,
                // apikey,
                msgType,
                msg,
                msgSize: data.toString().length,
            });

            if (msgType === 'TASK') {
                const task = msg.payload;
                if (!task) {
                    logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
                    return;
                }

                logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
                    xid,
                    task,
                });

                const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
                if (sdkTrxIdAdder) {
                    const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;

                    logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
                        xid,
                        sdkTrxIdAdder,
                        originalTrxId: task.trx_id,
                        newTrxId,
                    });

                    task.trx_id = newTrxId;
                }

                const replyMessage = {
                    type: 'ACK',
                    rid: msg.rid || null,
                    ts: new Date(),
                    error: false,
                };

                ws.send(JSON.stringify(replyMessage), { compress: true });

                if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
                    logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
                    return;
                }

                logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
                    xid,
                });

                partner.buy(task, xid);
            } else {
                logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
                    xid,
                    msgType,
                    msg,
                });
            }
        });

        ws.send(JSON.stringify({
            msgType: 'WELCOMEMSG',
            data: {
                msg: 'bla bla bla',
                pushTrxSdkVersion,
            },
        }));
    });

    server.on('upgrade', (req, socket, head) => {
        const xid = uniqid();

        const apikey = req.headers && (req.headers.apikey || req.headers.token);

        const matchedApikey = isValidApikey(
            apikey,
            config.push_trx_server && config.push_trx_server.apikey,
        );

        if (!matchedApikey) {
            rejectConnection(req, socket);
            return;
        }

        wss.handleUpgrade(req, socket, head, (ws) => {
            const client = {
                xid,
                remoteAddress: req.socket.remoteAddress,
                apikeyName: matchedApikey.name,
                apikey,
            };

            logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
                xid,
                headers: req.headers,
                client,
            });

            wss.emit('connection', ws, req, client);
        });
    });

    server.listen(wsListenPort, () => {
        logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
    });

    // const heatbeatPingInterval = setInterval(() => {
    //     wss.clients.forEach((ws) => {
    //         heartbeatPing(ws);
    //     });
    // }, PING_INTERVAL_MS);

    wss.on('close', () => {
        logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
        // clearInterval(heatbeatPingInterval);
    });
};

if (!wsListenPort) {
    logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
        config: {
            push_trx_server: !!config && !!config.push_trx_server && {
                port: !!config.push_trx_server.port,
                listen_port: !!config.push_trx_server.listen_port,
            },
        },
    });
} else {
    initServer();
}