Commit ffab3e8b298e0677c6e7bc4ff282e43ba71d3107

Authored by Adhidarma Hadiwinoto
1 parent bd53b188a2
Exists in master

Fix xid pairs

Showing 1 changed file with 12 additions and 5 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 if (!wsListenPort) {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: { 33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && { 34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port, 35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port, 36 listen_port: !!config.push_trx_server.listen_port,
37 }, 37 },
38 }, 38 },
39 }); 39 });
40 } else { 40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 42
43 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
44 const connectionXid = uniqid();
45
46 // eslint-disable-next-line no-param-reassign 44 // eslint-disable-next-line no-param-reassign
47 ws.isAlive = true; 45 ws.isAlive = true;
48 46
49 const { remoteAddress, apikey, apikeyName } = client; 47 const {
48 xid: connectionXid,
49 remoteAddress,
50 apikey,
51 apikeyName,
52 } = client;
50 53
51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
52 xid: connectionXid, 55 xid: connectionXid,
53 remoteAddress, 56 remoteAddress,
54 apikeyName, 57 apikeyName,
55 apikey, 58 apikey,
56 }); 59 });
57 60
58 const heatbeatPingInterval = setInterval(() => { 61 const heatbeatPingInterval = setInterval(() => {
59 heartbeatPing(ws); 62 heartbeatPing(ws);
60 }, PING_INTERVAL_MS); 63 }, PING_INTERVAL_MS);
61 64
62 ws.on('ping', (data) => onPing(ws, data)); 65 ws.on('ping', (data) => onPing(ws, data));
63 ws.on('pong', (data) => onPong(ws, data)); 66 ws.on('pong', (data) => onPong(ws, data));
64 67
65 ws.on('close', (code, reason) => { 68 ws.on('close', (code, reason) => {
66 const xid = uniqid(); 69 const xid = uniqid();
67 70
68 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 71 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
69 xid, 72 xid,
70 remoteAddress, 73 remoteAddress,
71 apikey, 74 apikey,
72 code, 75 code,
73 reason: reason.toString(), 76 reason: reason.toString(),
74 }); 77 });
75 78
76 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 79 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
77 xid, 80 xid,
78 }); 81 });
79 clearInterval(heatbeatPingInterval); 82 clearInterval(heatbeatPingInterval);
80 }); 83 });
81 84
82 ws.on('message', (data) => { 85 ws.on('message', (data) => {
83 const xid = uniqid(); 86 const xid = uniqid();
84 87
85 // eslint-disable-next-line no-param-reassign 88 // eslint-disable-next-line no-param-reassign
86 ws.isAlive = true; 89 ws.isAlive = true;
87 90
88 let msg; 91 let msg;
89 try { 92 try {
90 msg = JSON.parse(data.toString() || ''); 93 msg = JSON.parse(data.toString() || '');
91 } catch (e) { 94 } catch (e) {
92 msg = data.toString(); 95 msg = data.toString();
93 } 96 }
94 97
95 if (!msg) return; 98 if (!msg) return;
96 99
97 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 100 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
98 101
99 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 102 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
100 xid, 103 xid,
101 remoteAddress, 104 apikeyName,
102 apikey, 105 // apikey,
103 msgType, 106 msgType,
104 msg, 107 msg,
105 msgSize: data.toString().length, 108 msgSize: data.toString().length,
106 }); 109 });
107 110
108 if (msgType === 'TASK') { 111 if (msgType === 'TASK') {
109 const task = msg.payload; 112 const task = msg.payload;
110 if (!task) { 113 if (!task) {
111 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 114 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
112 return; 115 return;
113 } 116 }
114 117
115 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 118 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
116 xid, 119 xid,
117 task, 120 task,
118 }); 121 });
119 122
120 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 123 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
121 if (sdkTrxIdAdder) { 124 if (sdkTrxIdAdder) {
122 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 125 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
123 126
124 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 127 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
125 xid, 128 xid,
126 sdkTrxIdAdder, 129 sdkTrxIdAdder,
127 originalTrxId: task.trx_id, 130 originalTrxId: task.trx_id,
128 newTrxId, 131 newTrxId,
129 }); 132 });
130 133
131 task.trx_id = newTrxId; 134 task.trx_id = newTrxId;
132 } 135 }
133 136
134 const replyMessage = { 137 const replyMessage = {
135 type: 'ACK', 138 type: 'ACK',
136 rid: msg.rid || null, 139 rid: msg.rid || null,
137 ts: new Date(), 140 ts: new Date(),
138 error: false, 141 error: false,
139 }; 142 };
140 143
141 ws.send(JSON.stringify(replyMessage), { compress: true }); 144 ws.send(JSON.stringify(replyMessage), { compress: true });
142 145
143 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 146 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
144 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 147 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
145 return; 148 return;
146 } 149 }
147 150
148 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 151 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
149 xid, 152 xid,
150 }); 153 });
151 154
152 partner.buy(task, xid); 155 partner.buy(task, xid);
153 } else { 156 } else {
154 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 157 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
155 xid, 158 xid,
156 msgType, 159 msgType,
157 msg, 160 msg,
158 }); 161 });
159 } 162 }
160 }); 163 });
161 164
162 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 165 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
163 }); 166 });
164 167
165 server.on('upgrade', (req, socket, head) => { 168 server.on('upgrade', (req, socket, head) => {
169 const xid = uniqid();
170
166 const apikey = req.headers && (req.headers.apikey || req.headers.token); 171 const apikey = req.headers && (req.headers.apikey || req.headers.token);
167 172
168 const matchedApikey = isValidApikey( 173 const matchedApikey = isValidApikey(
169 apikey, 174 apikey,
170 config.push_trx_server && config.push_trx_server.apikey, 175 config.push_trx_server && config.push_trx_server.apikey,
171 ); 176 );
172 177
173 if (!matchedApikey) { 178 if (!matchedApikey) {
174 rejectConnection(req, socket); 179 rejectConnection(req, socket);
175 return; 180 return;
176 } 181 }
177 182
178 wss.handleUpgrade(req, socket, head, (ws) => { 183 wss.handleUpgrade(req, socket, head, (ws) => {
179 const client = { 184 const client = {
185 xid,
180 remoteAddress: req.socket.remoteAddress, 186 remoteAddress: req.socket.remoteAddress,
181 apikeyName: matchedApikey.name, 187 apikeyName: matchedApikey.name,
182 apikey, 188 apikey,
183 }; 189 };
184 190
185 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 191 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
192 xid,
186 headers: req.headers, 193 headers: req.headers,
187 client, 194 client,
188 }); 195 });
189 196
190 wss.emit('connection', ws, req, client); 197 wss.emit('connection', ws, req, client);
191 }); 198 });
192 }); 199 });
193 200
194 server.listen(wsListenPort, () => { 201 server.listen(wsListenPort, () => {
195 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 202 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
196 }); 203 });
197 204
198 // const heatbeatPingInterval = setInterval(() => { 205 // const heatbeatPingInterval = setInterval(() => {
199 // wss.clients.forEach((ws) => { 206 // wss.clients.forEach((ws) => {
200 // heartbeatPing(ws); 207 // heartbeatPing(ws);
201 // }); 208 // });
202 // }, PING_INTERVAL_MS); 209 // }, PING_INTERVAL_MS);
203 210
204 wss.on('close', () => { 211 wss.on('close', () => {
205 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 212 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
206 // clearInterval(heatbeatPingInterval); 213 // clearInterval(heatbeatPingInterval);
207 }); 214 });