const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; const config = require('komodo-sdk/config'); const logger = require('tektrans-logger'); const { createServer } = require('http'); const { WebSocketServer } = require('ws'); const uniqid = require('uniqid'); const defaultValues = require('./lib/default-values'); const isValidApikey = require('./lib/is-valid-apikey'); const rejectConnection = require('./lib/server/reject-connection'); const heartbeatPing = require('./lib/heartbeat-ping'); const onPing = require('./lib/on-ping'); const onPong = require('./lib/on-pong'); const PING_INTERVAL_MS = defaultValues.pingIntervalMs; const server = createServer(); let partner; exports.setPartner = (partnerFromCaller) => { logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); partner = partnerFromCaller; }; const wsListenPort = config.push_trx_server && (config.push_trx_server.port || config.push_trx_server.listen_port); if (!wsListenPort) { logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { config: { push_trx_server: !!config && !!config.push_trx_server && { port: !!config.push_trx_server.port, listen_port: !!config.push_trx_server.listen_port, }, }, }); } else { const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); wss.on('connection', (ws, req, client) => { // eslint-disable-next-line no-param-reassign ws.isAlive = true; const { xid: connectionXid, remoteAddress, apikey, apikeyName, } = client; logger.info(`${MODULE_NAME} F7755A03: Client connected`, { xid: connectionXid || uniqid(), client, }); const heatbeatPingInterval = setInterval(() => { heartbeatPing(ws); }, PING_INTERVAL_MS); ws.on('ping', (data) => onPing(ws, data)); ws.on('pong', (data) => onPong(ws, data)); ws.on('close', (code, reason) => { const xid = uniqid(); logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { xid, remoteAddress, apikey, code, reason: reason.toString(), }); logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { xid, }); clearInterval(heatbeatPingInterval); }); ws.on('message', (data) => { const xid = uniqid(); // eslint-disable-next-line no-param-reassign ws.isAlive = true; let msg; try { msg = JSON.parse(data.toString() || ''); } catch (e) { msg = data.toString(); } if (!msg) return; const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { xid, apikeyName, // apikey, msgType, msg, msgSize: data.toString().length, }); if (msgType === 'TASK') { const task = msg.payload; if (!task) { logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); return; } logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { xid, task, }); const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); if (sdkTrxIdAdder) { const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { xid, sdkTrxIdAdder, originalTrxId: task.trx_id, newTrxId, }); task.trx_id = newTrxId; } const replyMessage = { type: 'ACK', rid: msg.rid || null, ts: new Date(), error: false, }; ws.send(JSON.stringify(replyMessage), { compress: true }); if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); return; } logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { xid, }); partner.buy(task, xid); } else { logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { xid, msgType, msg, }); } }); ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); }); server.on('upgrade', (req, socket, head) => { const xid = uniqid(); const apikey = req.headers && (req.headers.apikey || req.headers.token); const matchedApikey = isValidApikey( apikey, config.push_trx_server && config.push_trx_server.apikey, ); if (!matchedApikey) { rejectConnection(req, socket); return; } wss.handleUpgrade(req, socket, head, (ws) => { const client = { xid, remoteAddress: req.socket.remoteAddress, apikeyName: matchedApikey.name, apikey, }; logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { xid, headers: req.headers, client, }); wss.emit('connection', ws, req, client); }); }); server.listen(wsListenPort, () => { logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); }); // const heatbeatPingInterval = setInterval(() => { // wss.clients.forEach((ws) => { // heartbeatPing(ws); // }); // }, PING_INTERVAL_MS); wss.on('close', () => { logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); // clearInterval(heatbeatPingInterval); }); }