Commit 7aed3590d4d688f08fa69d06f53147502d0cf6e4

Authored by Adhidarma Hadiwinoto
1 parent 76c767fae1
Exists in master

More verbose log on disabled server

Showing 1 changed file with 9 additions and 1 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
24 partner = partnerFromCaller; 25 partner = partnerFromCaller;
25 }; 26 };
26 27
27 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
28 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
29 30
30 if (!wsListenPort) { 31 if (!wsListenPort) {
31 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`); 32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
33 config: {
34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port,
37 },
38 },
39 });
32 } else { 40 } else {
33 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
34 42
35 wss.on('connection', (ws, req, client) => { 43 wss.on('connection', (ws, req, client) => {
36 const connectionXid = uniqid(); 44 const connectionXid = uniqid();
37 45
38 // eslint-disable-next-line no-param-reassign 46 // eslint-disable-next-line no-param-reassign
39 ws.isAlive = true; 47 ws.isAlive = true;
40 48
41 const { remoteAddress, apikey } = client; 49 const { remoteAddress, apikey } = client;
42 50
43 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 51 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
44 xid: connectionXid, 52 xid: connectionXid,
45 remoteAddress, 53 remoteAddress,
46 apikey, 54 apikey,
47 }); 55 });
48 56
49 const heatbeatPingInterval = setInterval(() => { 57 const heatbeatPingInterval = setInterval(() => {
50 heartbeatPing(ws); 58 heartbeatPing(ws);
51 }, PING_INTERVAL_MS); 59 }, PING_INTERVAL_MS);
52 60
53 ws.on('ping', (data) => onPing(ws, data)); 61 ws.on('ping', (data) => onPing(ws, data));
54 ws.on('pong', (data) => onPong(ws, data)); 62 ws.on('pong', (data) => onPong(ws, data));
55 63
56 ws.on('close', (code, reason) => { 64 ws.on('close', (code, reason) => {
57 const xid = uniqid(); 65 const xid = uniqid();
58 66
59 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 67 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
60 xid, 68 xid,
61 remoteAddress, 69 remoteAddress,
62 apikey, 70 apikey,
63 code, 71 code,
64 reason: reason.toString(), 72 reason: reason.toString(),
65 }); 73 });
66 74
67 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 75 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
68 xid, 76 xid,
69 }); 77 });
70 clearInterval(heatbeatPingInterval); 78 clearInterval(heatbeatPingInterval);
71 }); 79 });
72 80
73 ws.on('message', (data) => { 81 ws.on('message', (data) => {
74 const xid = uniqid(); 82 const xid = uniqid();
75 83
76 // eslint-disable-next-line no-param-reassign 84 // eslint-disable-next-line no-param-reassign
77 ws.isAlive = true; 85 ws.isAlive = true;
78 86
79 let msg; 87 let msg;
80 try { 88 try {
81 msg = JSON.parse(data.toString() || ''); 89 msg = JSON.parse(data.toString() || '');
82 } catch (e) { 90 } catch (e) {
83 msg = data.toString(); 91 msg = data.toString();
84 } 92 }
85 93
86 if (!msg) return; 94 if (!msg) return;
87 95
88 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 96 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
89 97
90 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 98 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
91 xid, 99 xid,
92 remoteAddress, 100 remoteAddress,
93 apikey, 101 apikey,
94 msgType, 102 msgType,
95 msg, 103 msg,
96 msgSize: data.toString().length, 104 msgSize: data.toString().length,
97 }); 105 });
98 106
99 if (msgType === 'TASK') { 107 if (msgType === 'TASK') {
100 const task = msg.payload; 108 const task = msg.payload;
101 if (!task) { 109 if (!task) {
102 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 110 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
103 return; 111 return;
104 } 112 }
105 113
106 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 114 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
107 xid, 115 xid,
108 task, 116 task,
109 }); 117 });
110 118
111 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 119 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
112 if (sdkTrxIdAdder) { 120 if (sdkTrxIdAdder) {
113 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 121 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
114 122
115 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 123 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
116 xid, 124 xid,
117 sdkTrxIdAdder, 125 sdkTrxIdAdder,
118 originalTrxId: task.trx_id, 126 originalTrxId: task.trx_id,
119 newTrxId, 127 newTrxId,
120 }); 128 });
121 129
122 task.trx_id = newTrxId; 130 task.trx_id = newTrxId;
123 } 131 }
124 132
125 const replyMessage = { 133 const replyMessage = {
126 type: 'ACK', 134 type: 'ACK',
127 rid: msg.rid || null, 135 rid: msg.rid || null,
128 ts: new Date(), 136 ts: new Date(),
129 error: false, 137 error: false,
130 }; 138 };
131 139
132 ws.send(JSON.stringify(replyMessage), { compress: true }); 140 ws.send(JSON.stringify(replyMessage), { compress: true });
133 141
134 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 142 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
135 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 143 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
136 return; 144 return;
137 } 145 }
138 146
139 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 147 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
140 xid, 148 xid,
141 }); 149 });
142 150
143 partner.buy(task, xid); 151 partner.buy(task, xid);
144 } else { 152 } else {
145 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 153 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
146 xid, 154 xid,
147 msgType, 155 msgType,
148 msg, 156 msg,
149 }); 157 });
150 } 158 }
151 }); 159 });
152 160
153 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 161 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
154 }); 162 });
155 163
156 server.on('upgrade', (req, socket, head) => { 164 server.on('upgrade', (req, socket, head) => {
157 const apikey = req.headers && (req.headers.apikey || req.headers.token); 165 const apikey = req.headers && (req.headers.apikey || req.headers.token);
158 166
159 if (!isValidApikey(apikey, config.push_trx_server && config.push_trx_server.apikey)) { 167 if (!isValidApikey(apikey, config.push_trx_server && config.push_trx_server.apikey)) {
160 rejectConnection(req, socket); 168 rejectConnection(req, socket);
161 return; 169 return;
162 } 170 }
163 171
164 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 172 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
165 headers: req.headers, 173 headers: req.headers,
166 }); 174 });
167 175
168 wss.handleUpgrade(req, socket, head, (ws) => { 176 wss.handleUpgrade(req, socket, head, (ws) => {
169 const client = { 177 const client = {
170 remoteAddress: req.socket.remoteAddress, 178 remoteAddress: req.socket.remoteAddress,
171 apikey, 179 apikey,
172 }; 180 };
173 181
174 wss.emit('connection', ws, req, client); 182 wss.emit('connection', ws, req, client);
175 }); 183 });
176 }); 184 });
177 185
178 server.listen(wsListenPort, () => { 186 server.listen(wsListenPort, () => {
179 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 187 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
180 }); 188 });
181 189
182 // const heatbeatPingInterval = setInterval(() => { 190 // const heatbeatPingInterval = setInterval(() => {
183 // wss.clients.forEach((ws) => { 191 // wss.clients.forEach((ws) => {
184 // heartbeatPing(ws); 192 // heartbeatPing(ws);
185 // }); 193 // });
186 // }, PING_INTERVAL_MS); 194 // }, PING_INTERVAL_MS);
187 195
188 wss.on('close', () => { 196 wss.on('close', () => {
189 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 197 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
190 // clearInterval(heatbeatPingInterval); 198 // clearInterval(heatbeatPingInterval);
191 }); 199 });
192 } 200 }
193 201