Commit f8580302b471568332b08bf0d0e1f974cc753831

Authored by Adhidarma Hadiwinoto
1 parent 95a0a88808
Exists in master

Add server generated xid on missing client xid

Showing 1 changed file with 2 additions and 4 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 if (!wsListenPort) {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: { 33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && { 34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port, 35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port, 36 listen_port: !!config.push_trx_server.listen_port,
37 }, 37 },
38 }, 38 },
39 }); 39 });
40 } else { 40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 42
43 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
44 // eslint-disable-next-line no-param-reassign 44 // eslint-disable-next-line no-param-reassign
45 ws.isAlive = true; 45 ws.isAlive = true;
46 46
47 const { 47 const {
48 xid: connectionXid, 48 xid: connectionXid,
49 remoteAddress, 49 remoteAddress,
50 apikey, 50 apikey,
51 apikeyName, 51 apikeyName,
52 } = client; 52 } = client;
53 53
54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
55 xid: connectionXid, 55 xid: connectionXid || uniqid(),
56 remoteAddress, 56 client,
57 apikeyName,
58 apikey,
59 }); 57 });
60 58
61 const heatbeatPingInterval = setInterval(() => { 59 const heatbeatPingInterval = setInterval(() => {
62 heartbeatPing(ws); 60 heartbeatPing(ws);
63 }, PING_INTERVAL_MS); 61 }, PING_INTERVAL_MS);
64 62
65 ws.on('ping', (data) => onPing(ws, data)); 63 ws.on('ping', (data) => onPing(ws, data));
66 ws.on('pong', (data) => onPong(ws, data)); 64 ws.on('pong', (data) => onPong(ws, data));
67 65
68 ws.on('close', (code, reason) => { 66 ws.on('close', (code, reason) => {
69 const xid = uniqid(); 67 const xid = uniqid();
70 68
71 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 69 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
72 xid, 70 xid,
73 remoteAddress, 71 remoteAddress,
74 apikey, 72 apikey,
75 code, 73 code,
76 reason: reason.toString(), 74 reason: reason.toString(),
77 }); 75 });
78 76
79 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 77 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
80 xid, 78 xid,
81 }); 79 });
82 clearInterval(heatbeatPingInterval); 80 clearInterval(heatbeatPingInterval);
83 }); 81 });
84 82
85 ws.on('message', (data) => { 83 ws.on('message', (data) => {
86 const xid = uniqid(); 84 const xid = uniqid();
87 85
88 // eslint-disable-next-line no-param-reassign 86 // eslint-disable-next-line no-param-reassign
89 ws.isAlive = true; 87 ws.isAlive = true;
90 88
91 let msg; 89 let msg;
92 try { 90 try {
93 msg = JSON.parse(data.toString() || ''); 91 msg = JSON.parse(data.toString() || '');
94 } catch (e) { 92 } catch (e) {
95 msg = data.toString(); 93 msg = data.toString();
96 } 94 }
97 95
98 if (!msg) return; 96 if (!msg) return;
99 97
100 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 98 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
101 99
102 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 100 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
103 xid, 101 xid,
104 apikeyName, 102 apikeyName,
105 // apikey, 103 // apikey,
106 msgType, 104 msgType,
107 msg, 105 msg,
108 msgSize: data.toString().length, 106 msgSize: data.toString().length,
109 }); 107 });
110 108
111 if (msgType === 'TASK') { 109 if (msgType === 'TASK') {
112 const task = msg.payload; 110 const task = msg.payload;
113 if (!task) { 111 if (!task) {
114 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 112 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
115 return; 113 return;
116 } 114 }
117 115
118 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 116 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
119 xid, 117 xid,
120 task, 118 task,
121 }); 119 });
122 120
123 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 121 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
124 if (sdkTrxIdAdder) { 122 if (sdkTrxIdAdder) {
125 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 123 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
126 124
127 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 125 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
128 xid, 126 xid,
129 sdkTrxIdAdder, 127 sdkTrxIdAdder,
130 originalTrxId: task.trx_id, 128 originalTrxId: task.trx_id,
131 newTrxId, 129 newTrxId,
132 }); 130 });
133 131
134 task.trx_id = newTrxId; 132 task.trx_id = newTrxId;
135 } 133 }
136 134
137 const replyMessage = { 135 const replyMessage = {
138 type: 'ACK', 136 type: 'ACK',
139 rid: msg.rid || null, 137 rid: msg.rid || null,
140 ts: new Date(), 138 ts: new Date(),
141 error: false, 139 error: false,
142 }; 140 };
143 141
144 ws.send(JSON.stringify(replyMessage), { compress: true }); 142 ws.send(JSON.stringify(replyMessage), { compress: true });
145 143
146 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 144 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
147 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 145 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
148 return; 146 return;
149 } 147 }
150 148
151 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 149 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
152 xid, 150 xid,
153 }); 151 });
154 152
155 partner.buy(task, xid); 153 partner.buy(task, xid);
156 } else { 154 } else {
157 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 155 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
158 xid, 156 xid,
159 msgType, 157 msgType,
160 msg, 158 msg,
161 }); 159 });
162 } 160 }
163 }); 161 });
164 162
165 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 163 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
166 }); 164 });
167 165
168 server.on('upgrade', (req, socket, head) => { 166 server.on('upgrade', (req, socket, head) => {
169 const xid = uniqid(); 167 const xid = uniqid();
170 168
171 const apikey = req.headers && (req.headers.apikey || req.headers.token); 169 const apikey = req.headers && (req.headers.apikey || req.headers.token);
172 170
173 const matchedApikey = isValidApikey( 171 const matchedApikey = isValidApikey(
174 apikey, 172 apikey,
175 config.push_trx_server && config.push_trx_server.apikey, 173 config.push_trx_server && config.push_trx_server.apikey,
176 ); 174 );
177 175
178 if (!matchedApikey) { 176 if (!matchedApikey) {
179 rejectConnection(req, socket); 177 rejectConnection(req, socket);
180 return; 178 return;
181 } 179 }
182 180
183 wss.handleUpgrade(req, socket, head, (ws) => { 181 wss.handleUpgrade(req, socket, head, (ws) => {
184 const client = { 182 const client = {
185 xid, 183 xid,
186 remoteAddress: req.socket.remoteAddress, 184 remoteAddress: req.socket.remoteAddress,
187 apikeyName: matchedApikey.name, 185 apikeyName: matchedApikey.name,
188 apikey, 186 apikey,
189 }; 187 };
190 188
191 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 189 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
192 xid, 190 xid,
193 headers: req.headers, 191 headers: req.headers,
194 client, 192 client,
195 }); 193 });
196 194
197 wss.emit('connection', ws, req, client); 195 wss.emit('connection', ws, req, client);
198 }); 196 });
199 }); 197 });
200 198
201 server.listen(wsListenPort, () => { 199 server.listen(wsListenPort, () => {
202 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 200 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
203 }); 201 });
204 202
205 // const heatbeatPingInterval = setInterval(() => { 203 // const heatbeatPingInterval = setInterval(() => {
206 // wss.clients.forEach((ws) => { 204 // wss.clients.forEach((ws) => {
207 // heartbeatPing(ws); 205 // heartbeatPing(ws);
208 // }); 206 // });
209 // }, PING_INTERVAL_MS); 207 // }, PING_INTERVAL_MS);
210 208
211 wss.on('close', () => { 209 wss.on('close', () => {
212 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 210 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
213 // clearInterval(heatbeatPingInterval); 211 // clearInterval(heatbeatPingInterval);
214 }); 212 });
215 } 213 }
216 214