Commit 5c5cf22955ee66f1e60f9421f1b5355ae1b0cbbd

Authored by Adhidarma Hadiwinoto
1 parent 192a9e60ec
Exists in master

Rename clientName to apikeyName

Showing 1 changed file with 8 additions and 7 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 if (!wsListenPort) {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: { 33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && { 34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port, 35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port, 36 listen_port: !!config.push_trx_server.listen_port,
37 }, 37 },
38 }, 38 },
39 }); 39 });
40 } else { 40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 42
43 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
44 const connectionXid = uniqid(); 44 const connectionXid = uniqid();
45 45
46 // eslint-disable-next-line no-param-reassign 46 // eslint-disable-next-line no-param-reassign
47 ws.isAlive = true; 47 ws.isAlive = true;
48 48
49 const { remoteAddress, apikey, name: clientName } = client; 49 const { remoteAddress, apikey, apikeyName } = client;
50 50
51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
52 xid: connectionXid, 52 xid: connectionXid,
53 remoteAddress, 53 remoteAddress,
54 clientName, 54 apikeyName,
55 apikey, 55 apikey,
56 }); 56 });
57 57
58 const heatbeatPingInterval = setInterval(() => { 58 const heatbeatPingInterval = setInterval(() => {
59 heartbeatPing(ws); 59 heartbeatPing(ws);
60 }, PING_INTERVAL_MS); 60 }, PING_INTERVAL_MS);
61 61
62 ws.on('ping', (data) => onPing(ws, data)); 62 ws.on('ping', (data) => onPing(ws, data));
63 ws.on('pong', (data) => onPong(ws, data)); 63 ws.on('pong', (data) => onPong(ws, data));
64 64
65 ws.on('close', (code, reason) => { 65 ws.on('close', (code, reason) => {
66 const xid = uniqid(); 66 const xid = uniqid();
67 67
68 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 68 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
69 xid, 69 xid,
70 remoteAddress, 70 remoteAddress,
71 apikey, 71 apikey,
72 code, 72 code,
73 reason: reason.toString(), 73 reason: reason.toString(),
74 }); 74 });
75 75
76 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 76 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
77 xid, 77 xid,
78 }); 78 });
79 clearInterval(heatbeatPingInterval); 79 clearInterval(heatbeatPingInterval);
80 }); 80 });
81 81
82 ws.on('message', (data) => { 82 ws.on('message', (data) => {
83 const xid = uniqid(); 83 const xid = uniqid();
84 84
85 // eslint-disable-next-line no-param-reassign 85 // eslint-disable-next-line no-param-reassign
86 ws.isAlive = true; 86 ws.isAlive = true;
87 87
88 let msg; 88 let msg;
89 try { 89 try {
90 msg = JSON.parse(data.toString() || ''); 90 msg = JSON.parse(data.toString() || '');
91 } catch (e) { 91 } catch (e) {
92 msg = data.toString(); 92 msg = data.toString();
93 } 93 }
94 94
95 if (!msg) return; 95 if (!msg) return;
96 96
97 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 97 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
98 98
99 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 99 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
100 xid, 100 xid,
101 remoteAddress, 101 remoteAddress,
102 apikey, 102 apikey,
103 msgType, 103 msgType,
104 msg, 104 msg,
105 msgSize: data.toString().length, 105 msgSize: data.toString().length,
106 }); 106 });
107 107
108 if (msgType === 'TASK') { 108 if (msgType === 'TASK') {
109 const task = msg.payload; 109 const task = msg.payload;
110 if (!task) { 110 if (!task) {
111 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 111 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
112 return; 112 return;
113 } 113 }
114 114
115 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 115 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
116 xid, 116 xid,
117 task, 117 task,
118 }); 118 });
119 119
120 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 120 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
121 if (sdkTrxIdAdder) { 121 if (sdkTrxIdAdder) {
122 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 122 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
123 123
124 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 124 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
125 xid, 125 xid,
126 sdkTrxIdAdder, 126 sdkTrxIdAdder,
127 originalTrxId: task.trx_id, 127 originalTrxId: task.trx_id,
128 newTrxId, 128 newTrxId,
129 }); 129 });
130 130
131 task.trx_id = newTrxId; 131 task.trx_id = newTrxId;
132 } 132 }
133 133
134 const replyMessage = { 134 const replyMessage = {
135 type: 'ACK', 135 type: 'ACK',
136 rid: msg.rid || null, 136 rid: msg.rid || null,
137 ts: new Date(), 137 ts: new Date(),
138 error: false, 138 error: false,
139 }; 139 };
140 140
141 ws.send(JSON.stringify(replyMessage), { compress: true }); 141 ws.send(JSON.stringify(replyMessage), { compress: true });
142 142
143 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 143 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
144 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 144 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
145 return; 145 return;
146 } 146 }
147 147
148 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 148 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
149 xid, 149 xid,
150 }); 150 });
151 151
152 partner.buy(task, xid); 152 partner.buy(task, xid);
153 } else { 153 } else {
154 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 154 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
155 xid, 155 xid,
156 msgType, 156 msgType,
157 msg, 157 msg,
158 }); 158 });
159 } 159 }
160 }); 160 });
161 161
162 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 162 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
163 }); 163 });
164 164
165 server.on('upgrade', (req, socket, head) => { 165 server.on('upgrade', (req, socket, head) => {
166 const apikey = req.headers && (req.headers.apikey || req.headers.token); 166 const apikey = req.headers && (req.headers.apikey || req.headers.token);
167 167
168 const matchedApikey = isValidApikey( 168 const matchedApikey = isValidApikey(
169 apikey, 169 apikey,
170 config.push_trx_server && config.push_trx_server.apikey, 170 config.push_trx_server && config.push_trx_server.apikey,
171 ); 171 );
172 172
173 if (!matchedApikey) { 173 if (!matchedApikey) {
174 rejectConnection(req, socket); 174 rejectConnection(req, socket);
175 return; 175 return;
176 } 176 }
177 177
178 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
179 headers: req.headers,
180 });
181
182 wss.handleUpgrade(req, socket, head, (ws) => { 178 wss.handleUpgrade(req, socket, head, (ws) => {
183 const client = { 179 const client = {
184 remoteAddress: req.socket.remoteAddress, 180 remoteAddress: req.socket.remoteAddress,
181 apikeyName: matchedApikey.name,
185 apikey, 182 apikey,
186 name: matchedApikey.name,
187 }; 183 };
188 184
185 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
186 headers: req.headers,
187 client,
188 });
189
189 wss.emit('connection', ws, req, client); 190 wss.emit('connection', ws, req, client);
190 }); 191 });
191 }); 192 });
192 193
193 server.listen(wsListenPort, () => { 194 server.listen(wsListenPort, () => {
194 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 195 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
195 }); 196 });
196 197
197 // const heatbeatPingInterval = setInterval(() => { 198 // const heatbeatPingInterval = setInterval(() => {
198 // wss.clients.forEach((ws) => { 199 // wss.clients.forEach((ws) => {
199 // heartbeatPing(ws); 200 // heartbeatPing(ws);
200 // }); 201 // });
201 // }, PING_INTERVAL_MS); 202 // }, PING_INTERVAL_MS);
202 203
203 wss.on('close', () => { 204 wss.on('close', () => {