Commit a30831dfd4f7fc75155f154c3265d982329e84b7

Authored by Adhidarma Hadiwinoto
1 parent b12a16aa54
Exists in master

Add apikeyname on disconnected log

Showing 1 changed file with 1 additions and 0 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 if (!wsListenPort) {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: { 33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && { 34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port, 35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port, 36 listen_port: !!config.push_trx_server.listen_port,
37 }, 37 },
38 }, 38 },
39 }); 39 });
40 } else { 40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 42
43 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
44 // eslint-disable-next-line no-param-reassign 44 // eslint-disable-next-line no-param-reassign
45 ws.isAlive = true; 45 ws.isAlive = true;
46 46
47 const { 47 const {
48 xid: connectionXid, 48 xid: connectionXid,
49 remoteAddress, 49 remoteAddress,
50 apikey, 50 apikey,
51 apikeyName, 51 apikeyName,
52 } = client; 52 } = client;
53 53
54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
55 xid: connectionXid || uniqid(), 55 xid: connectionXid || uniqid(),
56 client, 56 client,
57 }); 57 });
58 58
59 const heatbeatPingInterval = setInterval(() => { 59 const heatbeatPingInterval = setInterval(() => {
60 heartbeatPing(ws); 60 heartbeatPing(ws);
61 }, PING_INTERVAL_MS); 61 }, PING_INTERVAL_MS);
62 62
63 ws.on('ping', (data) => onPing(ws, data)); 63 ws.on('ping', (data) => onPing(ws, data));
64 ws.on('pong', (data) => onPong(ws, data)); 64 ws.on('pong', (data) => onPong(ws, data));
65 65
66 ws.on('close', (code, reason) => { 66 ws.on('close', (code, reason) => {
67 const xid = uniqid(); 67 const xid = uniqid();
68 68
69 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 69 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
70 xid, 70 xid,
71 remoteAddress, 71 remoteAddress,
72 apikeyName,
72 apikey, 73 apikey,
73 code, 74 code,
74 reason: reason.toString(), 75 reason: reason.toString(),
75 }); 76 });
76 77
77 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 78 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
78 xid, 79 xid,
79 }); 80 });
80 clearInterval(heatbeatPingInterval); 81 clearInterval(heatbeatPingInterval);
81 }); 82 });
82 83
83 ws.on('message', (data) => { 84 ws.on('message', (data) => {
84 const xid = uniqid(); 85 const xid = uniqid();
85 86
86 // eslint-disable-next-line no-param-reassign 87 // eslint-disable-next-line no-param-reassign
87 ws.isAlive = true; 88 ws.isAlive = true;
88 89
89 let msg; 90 let msg;
90 try { 91 try {
91 msg = JSON.parse(data.toString() || ''); 92 msg = JSON.parse(data.toString() || '');
92 } catch (e) { 93 } catch (e) {
93 msg = data.toString(); 94 msg = data.toString();
94 } 95 }
95 96
96 if (!msg) return; 97 if (!msg) return;
97 98
98 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 99 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
99 100
100 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 101 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
101 xid, 102 xid,
102 apikeyName, 103 apikeyName,
103 // apikey, 104 // apikey,
104 msgType, 105 msgType,
105 msg, 106 msg,
106 msgSize: data.toString().length, 107 msgSize: data.toString().length,
107 }); 108 });
108 109
109 if (msgType === 'TASK') { 110 if (msgType === 'TASK') {
110 const task = msg.payload; 111 const task = msg.payload;
111 if (!task) { 112 if (!task) {
112 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 113 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
113 return; 114 return;
114 } 115 }
115 116
116 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 117 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
117 xid, 118 xid,
118 task, 119 task,
119 }); 120 });
120 121
121 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 122 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
122 if (sdkTrxIdAdder) { 123 if (sdkTrxIdAdder) {
123 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 124 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
124 125
125 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 126 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
126 xid, 127 xid,
127 sdkTrxIdAdder, 128 sdkTrxIdAdder,
128 originalTrxId: task.trx_id, 129 originalTrxId: task.trx_id,
129 newTrxId, 130 newTrxId,
130 }); 131 });
131 132
132 task.trx_id = newTrxId; 133 task.trx_id = newTrxId;
133 } 134 }
134 135
135 const replyMessage = { 136 const replyMessage = {
136 type: 'ACK', 137 type: 'ACK',
137 rid: msg.rid || null, 138 rid: msg.rid || null,
138 ts: new Date(), 139 ts: new Date(),
139 error: false, 140 error: false,
140 }; 141 };
141 142
142 ws.send(JSON.stringify(replyMessage), { compress: true }); 143 ws.send(JSON.stringify(replyMessage), { compress: true });
143 144
144 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 145 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
145 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 146 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
146 return; 147 return;
147 } 148 }
148 149
149 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 150 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
150 xid, 151 xid,
151 }); 152 });
152 153
153 partner.buy(task, xid); 154 partner.buy(task, xid);
154 } else { 155 } else {
155 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 156 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
156 xid, 157 xid,
157 msgType, 158 msgType,
158 msg, 159 msg,
159 }); 160 });
160 } 161 }
161 }); 162 });
162 163
163 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 164 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
164 }); 165 });
165 166
166 server.on('upgrade', (req, socket, head) => { 167 server.on('upgrade', (req, socket, head) => {
167 const xid = uniqid(); 168 const xid = uniqid();
168 169
169 const apikey = req.headers && (req.headers.apikey || req.headers.token); 170 const apikey = req.headers && (req.headers.apikey || req.headers.token);
170 171
171 const matchedApikey = isValidApikey( 172 const matchedApikey = isValidApikey(
172 apikey, 173 apikey,
173 config.push_trx_server && config.push_trx_server.apikey, 174 config.push_trx_server && config.push_trx_server.apikey,
174 ); 175 );
175 176
176 if (!matchedApikey) { 177 if (!matchedApikey) {
177 rejectConnection(req, socket); 178 rejectConnection(req, socket);
178 return; 179 return;
179 } 180 }
180 181
181 wss.handleUpgrade(req, socket, head, (ws) => { 182 wss.handleUpgrade(req, socket, head, (ws) => {
182 const client = { 183 const client = {
183 xid, 184 xid,
184 remoteAddress: req.socket.remoteAddress, 185 remoteAddress: req.socket.remoteAddress,
185 apikeyName: matchedApikey.name, 186 apikeyName: matchedApikey.name,
186 apikey, 187 apikey,
187 }; 188 };
188 189
189 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 190 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
190 xid, 191 xid,
191 headers: req.headers, 192 headers: req.headers,
192 client, 193 client,
193 }); 194 });
194 195
195 wss.emit('connection', ws, req, client); 196 wss.emit('connection', ws, req, client);
196 }); 197 });
197 }); 198 });
198 199
199 server.listen(wsListenPort, () => { 200 server.listen(wsListenPort, () => {
200 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 201 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
201 }); 202 });
202 203
203 // const heatbeatPingInterval = setInterval(() => { 204 // const heatbeatPingInterval = setInterval(() => {
204 // wss.clients.forEach((ws) => { 205 // wss.clients.forEach((ws) => {
205 // heartbeatPing(ws); 206 // heartbeatPing(ws);
206 // }); 207 // });
207 // }, PING_INTERVAL_MS); 208 // }, PING_INTERVAL_MS);
208 209
209 wss.on('close', () => { 210 wss.on('close', () => {
210 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 211 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
211 // clearInterval(heatbeatPingInterval); 212 // clearInterval(heatbeatPingInterval);
212 }); 213 });
213 } 214 }
214 215