Blame view
server.js
6.44 KB
b76b508a9
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; const config = require('komodo-sdk/config'); const logger = require('tektrans-logger'); const { createServer } = require('http'); const { WebSocketServer } = require('ws'); const uniqid = require('uniqid'); const defaultValues = require('./lib/default-values'); const isValidApikey = require('./lib/is-valid-apikey'); const rejectConnection = require('./lib/server/reject-connection'); const heartbeatPing = require('./lib/heartbeat-ping'); const onPing = require('./lib/on-ping'); const onPong = require('./lib/on-pong'); const PING_INTERVAL_MS = defaultValues.pingIntervalMs; const server = createServer(); let partner; exports.setPartner = (partnerFromCaller) => { |
7aed3590d
|
24 |
logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); |
b76b508a9
|
25 26 27 28 29 30 31 |
partner = partnerFromCaller; }; const wsListenPort = config.push_trx_server && (config.push_trx_server.port || config.push_trx_server.listen_port); if (!wsListenPort) { |
7aed3590d
|
32 33 34 35 36 37 38 39 |
logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { config: { push_trx_server: !!config && !!config.push_trx_server && { port: !!config.push_trx_server.port, listen_port: !!config.push_trx_server.listen_port, }, }, }); |
b76b508a9
|
40 41 42 43 |
} else { const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); wss.on('connection', (ws, req, client) => { |
b76b508a9
|
44 45 |
// eslint-disable-next-line no-param-reassign ws.isAlive = true; |
ffab3e8b2
|
46 47 48 49 50 51 |
const { xid: connectionXid, remoteAddress, apikey, apikeyName, } = client; |
b76b508a9
|
52 53 |
logger.info(`${MODULE_NAME} F7755A03: Client connected`, { |
f8580302b
|
54 55 |
xid: connectionXid || uniqid(), client, |
b76b508a9
|
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
}); const heatbeatPingInterval = setInterval(() => { heartbeatPing(ws); }, PING_INTERVAL_MS); ws.on('ping', (data) => onPing(ws, data)); ws.on('pong', (data) => onPong(ws, data)); ws.on('close', (code, reason) => { const xid = uniqid(); logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { xid, remoteAddress, |
a30831dfd
|
71 |
apikeyName, |
b76b508a9
|
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
apikey, code, reason: reason.toString(), }); logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { xid, }); clearInterval(heatbeatPingInterval); }); ws.on('message', (data) => { const xid = uniqid(); // eslint-disable-next-line no-param-reassign ws.isAlive = true; let msg; try { msg = JSON.parse(data.toString() || ''); } catch (e) { msg = data.toString(); } if (!msg) return; const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { xid, |
ffab3e8b2
|
102 103 |
apikeyName, // apikey, |
b76b508a9
|
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
msgType, msg, msgSize: data.toString().length, }); if (msgType === 'TASK') { const task = msg.payload; if (!task) { logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); return; } logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { xid, task, }); const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); if (sdkTrxIdAdder) { const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { xid, sdkTrxIdAdder, originalTrxId: task.trx_id, newTrxId, }); task.trx_id = newTrxId; } const replyMessage = { type: 'ACK', rid: msg.rid || null, ts: new Date(), error: false, }; ws.send(JSON.stringify(replyMessage), { compress: true }); if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); return; } logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { xid, }); partner.buy(task, xid); } else { logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { xid, msgType, msg, }); } }); ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); }); server.on('upgrade', (req, socket, head) => { |
ffab3e8b2
|
167 |
const xid = uniqid(); |
b76b508a9
|
168 |
const apikey = req.headers && (req.headers.apikey || req.headers.token); |
0b6da0fdd
|
169 170 171 172 173 174 |
const matchedApikey = isValidApikey( apikey, config.push_trx_server && config.push_trx_server.apikey, ); if (!matchedApikey) { |
b76b508a9
|
175 176 177 |
rejectConnection(req, socket); return; } |
b76b508a9
|
178 179 |
wss.handleUpgrade(req, socket, head, (ws) => { const client = { |
ffab3e8b2
|
180 |
xid, |
b76b508a9
|
181 |
remoteAddress: req.socket.remoteAddress, |
5c5cf2295
|
182 |
apikeyName: matchedApikey.name, |
b76b508a9
|
183 184 |
apikey, }; |
5c5cf2295
|
185 |
logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { |
ffab3e8b2
|
186 |
xid, |
5c5cf2295
|
187 188 189 |
headers: req.headers, client, }); |
b76b508a9
|
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
wss.emit('connection', ws, req, client); }); }); server.listen(wsListenPort, () => { logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); }); // const heatbeatPingInterval = setInterval(() => { // wss.clients.forEach((ws) => { // heartbeatPing(ws); // }); // }, PING_INTERVAL_MS); wss.on('close', () => { logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); // clearInterval(heatbeatPingInterval); }); } |