Commit 0b6da0fdd1a0fd2379024c3a9ee177c923faa5d9

Authored by Adhidarma Hadiwinoto
1 parent 21d8b2ad9f
Exists in master

Add clientName on connected client

Showing 2 changed files with 12 additions and 3 deletions Inline Diff

lib/is-valid-apikey.js
1 module.exports = (apikey, apikeys) => { 1 module.exports = (apikey, apikeys) => {
2 if (!apikey) return false; 2 if (!apikey) return false;
3 if (typeof apikey !== 'string') return false; 3 if (typeof apikey !== 'string') return false;
4 if (!apikeys || !Array.isArray(apikeys)) return false; 4 if (!apikeys || !Array.isArray(apikeys)) return false;
5 5
6 return !!apikeys.find((item) => !item.disabled 6 const matchedApikey = !!apikeys.find((item) => !item.disabled
7 && ( 7 && (
8 (typeof item === 'string' && item === apikey) 8 (typeof item === 'string' && item === apikey)
9 || (item.value === apikey) 9 || (item.value === apikey)
10 )); 10 ));
11
12 return matchedApikey || false;
11 }; 13 };
12 14
1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 if (!wsListenPort) {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: { 33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && { 34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port, 35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port, 36 listen_port: !!config.push_trx_server.listen_port,
37 }, 37 },
38 }, 38 },
39 }); 39 });
40 } else { 40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 42
43 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
44 const connectionXid = uniqid(); 44 const connectionXid = uniqid();
45 45
46 // eslint-disable-next-line no-param-reassign 46 // eslint-disable-next-line no-param-reassign
47 ws.isAlive = true; 47 ws.isAlive = true;
48 48
49 const { remoteAddress, apikey } = client; 49 const { remoteAddress, apikey, name: clientName } = client;
50 50
51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
52 xid: connectionXid, 52 xid: connectionXid,
53 remoteAddress, 53 remoteAddress,
54 clientName,
54 apikey, 55 apikey,
55 }); 56 });
56 57
57 const heatbeatPingInterval = setInterval(() => { 58 const heatbeatPingInterval = setInterval(() => {
58 heartbeatPing(ws); 59 heartbeatPing(ws);
59 }, PING_INTERVAL_MS); 60 }, PING_INTERVAL_MS);
60 61
61 ws.on('ping', (data) => onPing(ws, data)); 62 ws.on('ping', (data) => onPing(ws, data));
62 ws.on('pong', (data) => onPong(ws, data)); 63 ws.on('pong', (data) => onPong(ws, data));
63 64
64 ws.on('close', (code, reason) => { 65 ws.on('close', (code, reason) => {
65 const xid = uniqid(); 66 const xid = uniqid();
66 67
67 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 68 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
68 xid, 69 xid,
69 remoteAddress, 70 remoteAddress,
70 apikey, 71 apikey,
71 code, 72 code,
72 reason: reason.toString(), 73 reason: reason.toString(),
73 }); 74 });
74 75
75 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 76 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
76 xid, 77 xid,
77 }); 78 });
78 clearInterval(heatbeatPingInterval); 79 clearInterval(heatbeatPingInterval);
79 }); 80 });
80 81
81 ws.on('message', (data) => { 82 ws.on('message', (data) => {
82 const xid = uniqid(); 83 const xid = uniqid();
83 84
84 // eslint-disable-next-line no-param-reassign 85 // eslint-disable-next-line no-param-reassign
85 ws.isAlive = true; 86 ws.isAlive = true;
86 87
87 let msg; 88 let msg;
88 try { 89 try {
89 msg = JSON.parse(data.toString() || ''); 90 msg = JSON.parse(data.toString() || '');
90 } catch (e) { 91 } catch (e) {
91 msg = data.toString(); 92 msg = data.toString();
92 } 93 }
93 94
94 if (!msg) return; 95 if (!msg) return;
95 96
96 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 97 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
97 98
98 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 99 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
99 xid, 100 xid,
100 remoteAddress, 101 remoteAddress,
101 apikey, 102 apikey,
102 msgType, 103 msgType,
103 msg, 104 msg,
104 msgSize: data.toString().length, 105 msgSize: data.toString().length,
105 }); 106 });
106 107
107 if (msgType === 'TASK') { 108 if (msgType === 'TASK') {
108 const task = msg.payload; 109 const task = msg.payload;
109 if (!task) { 110 if (!task) {
110 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 111 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
111 return; 112 return;
112 } 113 }
113 114
114 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 115 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
115 xid, 116 xid,
116 task, 117 task,
117 }); 118 });
118 119
119 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 120 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
120 if (sdkTrxIdAdder) { 121 if (sdkTrxIdAdder) {
121 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 122 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
122 123
123 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 124 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
124 xid, 125 xid,
125 sdkTrxIdAdder, 126 sdkTrxIdAdder,
126 originalTrxId: task.trx_id, 127 originalTrxId: task.trx_id,
127 newTrxId, 128 newTrxId,
128 }); 129 });
129 130
130 task.trx_id = newTrxId; 131 task.trx_id = newTrxId;
131 } 132 }
132 133
133 const replyMessage = { 134 const replyMessage = {
134 type: 'ACK', 135 type: 'ACK',
135 rid: msg.rid || null, 136 rid: msg.rid || null,
136 ts: new Date(), 137 ts: new Date(),
137 error: false, 138 error: false,
138 }; 139 };
139 140
140 ws.send(JSON.stringify(replyMessage), { compress: true }); 141 ws.send(JSON.stringify(replyMessage), { compress: true });
141 142
142 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 143 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
143 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 144 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
144 return; 145 return;
145 } 146 }
146 147
147 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 148 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
148 xid, 149 xid,
149 }); 150 });
150 151
151 partner.buy(task, xid); 152 partner.buy(task, xid);
152 } else { 153 } else {
153 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 154 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
154 xid, 155 xid,
155 msgType, 156 msgType,
156 msg, 157 msg,
157 }); 158 });
158 } 159 }
159 }); 160 });
160 161
161 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 162 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
162 }); 163 });
163 164
164 server.on('upgrade', (req, socket, head) => { 165 server.on('upgrade', (req, socket, head) => {
165 const apikey = req.headers && (req.headers.apikey || req.headers.token); 166 const apikey = req.headers && (req.headers.apikey || req.headers.token);
166 167
167 if (!isValidApikey(apikey, config.push_trx_server && config.push_trx_server.apikey)) { 168 const matchedApikey = isValidApikey(
169 apikey,
170 config.push_trx_server && config.push_trx_server.apikey,
171 );
172
173 if (!matchedApikey) {
168 rejectConnection(req, socket); 174 rejectConnection(req, socket);
169 return; 175 return;
170 } 176 }
171 177
172 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 178 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
173 headers: req.headers, 179 headers: req.headers,
174 }); 180 });
175 181
176 wss.handleUpgrade(req, socket, head, (ws) => { 182 wss.handleUpgrade(req, socket, head, (ws) => {
177 const client = { 183 const client = {
178 remoteAddress: req.socket.remoteAddress, 184 remoteAddress: req.socket.remoteAddress,
179 apikey, 185 apikey,
186 name: matchedApikey.name,
180 }; 187 };
181 188
182 wss.emit('connection', ws, req, client); 189 wss.emit('connection', ws, req, client);
183 }); 190 });
184 }); 191 });
185 192
186 server.listen(wsListenPort, () => { 193 server.listen(wsListenPort, () => {
187 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 194 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
188 }); 195 });
189 196
190 // const heatbeatPingInterval = setInterval(() => { 197 // const heatbeatPingInterval = setInterval(() => {
191 // wss.clients.forEach((ws) => { 198 // wss.clients.forEach((ws) => {
192 // heartbeatPing(ws); 199 // heartbeatPing(ws);
193 // }); 200 // });
194 // }, PING_INTERVAL_MS); 201 // }, PING_INTERVAL_MS);
195 202
196 wss.on('close', () => { 203 wss.on('close', () => {
197 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 204 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
198 // clearInterval(heatbeatPingInterval); 205 // clearInterval(heatbeatPingInterval);
199 }); 206 });
200 } 207 }
201 208