Commit 7ca0e79023dbf7cafb32591eb2fbcce1594d904e

Authored by Adhidarma Hadiwinoto
1 parent bdaec25723
Exists in master

Create initServer

Showing 1 changed file with 16 additions and 10 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const defaultValues = require('./lib/default-values'); 10 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 11 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 12 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 13 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 14 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 15 const onPong = require('./lib/on-pong');
16 16
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 18
19 const server = createServer(); 19 const server = createServer();
20 20
21 let partner; 21 let partner;
22 22
23 exports.setPartner = (partnerFromCaller) => { 23 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 25 partner = partnerFromCaller;
26 }; 26 };
27 27
28 const wsListenPort = config.push_trx_server 28 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 29 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 30
31 if (!wsListenPort) { 31 const initServer = () => {
32 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 32 logger.verbose(`${MODULE_NAME} 70D208B2: Initializing`);
33 config: { 33
34 push_trx_server: !!config && !!config.push_trx_server && {
35 port: !!config.push_trx_server.port,
36 listen_port: !!config.push_trx_server.listen_port,
37 },
38 },
39 });
40 } else {
41 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 34 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
42 35
43 wss.on('connection', (ws, req, client) => { 36 wss.on('connection', (ws, req, client) => {
44 // eslint-disable-next-line no-param-reassign 37 // eslint-disable-next-line no-param-reassign
45 ws.isAlive = true; 38 ws.isAlive = true;
46 39
47 const { 40 const {
48 xid: connectionXid, 41 xid: connectionXid,
49 remoteAddress, 42 remoteAddress,
50 apikey, 43 apikey,
51 apikeyName, 44 apikeyName,
52 } = client; 45 } = client;
53 46
54 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 47 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
55 xid: connectionXid || uniqid(), 48 xid: connectionXid || uniqid(),
56 client, 49 client,
57 }); 50 });
58 51
59 const heatbeatPingInterval = setInterval(() => { 52 const heatbeatPingInterval = setInterval(() => {
60 heartbeatPing(ws); 53 heartbeatPing(ws);
61 }, PING_INTERVAL_MS); 54 }, PING_INTERVAL_MS);
62 55
63 ws.on('ping', (data) => onPing(ws, data)); 56 ws.on('ping', (data) => onPing(ws, data));
64 ws.on('pong', (data) => onPong(ws, data)); 57 ws.on('pong', (data) => onPong(ws, data));
65 58
66 ws.on('close', (code, reason) => { 59 ws.on('close', (code, reason) => {
67 const xid = uniqid(); 60 const xid = uniqid();
68 61
69 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 62 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
70 xid, 63 xid,
71 remoteAddress, 64 remoteAddress,
72 apikeyName, 65 apikeyName,
73 apikey, 66 apikey,
74 code, 67 code,
75 reason: reason.toString(), 68 reason: reason.toString(),
76 }); 69 });
77 70
78 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 71 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
79 xid, 72 xid,
80 }); 73 });
81 clearInterval(heatbeatPingInterval); 74 clearInterval(heatbeatPingInterval);
82 }); 75 });
83 76
84 ws.on('message', (data) => { 77 ws.on('message', (data) => {
85 const xid = uniqid(); 78 const xid = uniqid();
86 79
87 // eslint-disable-next-line no-param-reassign 80 // eslint-disable-next-line no-param-reassign
88 ws.isAlive = true; 81 ws.isAlive = true;
89 82
90 let msg; 83 let msg;
91 try { 84 try {
92 msg = JSON.parse(data.toString() || ''); 85 msg = JSON.parse(data.toString() || '');
93 } catch (e) { 86 } catch (e) {
94 msg = data.toString(); 87 msg = data.toString();
95 } 88 }
96 89
97 if (!msg) return; 90 if (!msg) return;
98 91
99 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 92 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
100 93
101 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 94 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
102 xid, 95 xid,
103 apikeyName, 96 apikeyName,
104 // apikey, 97 // apikey,
105 msgType, 98 msgType,
106 msg, 99 msg,
107 msgSize: data.toString().length, 100 msgSize: data.toString().length,
108 }); 101 });
109 102
110 if (msgType === 'TASK') { 103 if (msgType === 'TASK') {
111 const task = msg.payload; 104 const task = msg.payload;
112 if (!task) { 105 if (!task) {
113 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 106 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
114 return; 107 return;
115 } 108 }
116 109
117 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 110 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
118 xid, 111 xid,
119 task, 112 task,
120 }); 113 });
121 114
122 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 115 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
123 if (sdkTrxIdAdder) { 116 if (sdkTrxIdAdder) {
124 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 117 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
125 118
126 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 119 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
127 xid, 120 xid,
128 sdkTrxIdAdder, 121 sdkTrxIdAdder,
129 originalTrxId: task.trx_id, 122 originalTrxId: task.trx_id,
130 newTrxId, 123 newTrxId,
131 }); 124 });
132 125
133 task.trx_id = newTrxId; 126 task.trx_id = newTrxId;
134 } 127 }
135 128
136 const replyMessage = { 129 const replyMessage = {
137 type: 'ACK', 130 type: 'ACK',
138 rid: msg.rid || null, 131 rid: msg.rid || null,
139 ts: new Date(), 132 ts: new Date(),
140 error: false, 133 error: false,
141 }; 134 };
142 135
143 ws.send(JSON.stringify(replyMessage), { compress: true }); 136 ws.send(JSON.stringify(replyMessage), { compress: true });
144 137
145 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 138 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
146 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 139 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
147 return; 140 return;
148 } 141 }
149 142
150 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 143 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
151 xid, 144 xid,
152 }); 145 });
153 146
154 partner.buy(task, xid); 147 partner.buy(task, xid);
155 } else { 148 } else {
156 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 149 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
157 xid, 150 xid,
158 msgType, 151 msgType,
159 msg, 152 msg,
160 }); 153 });
161 } 154 }
162 }); 155 });
163 156
164 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 157 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } }));
165 }); 158 });
166 159
167 server.on('upgrade', (req, socket, head) => { 160 server.on('upgrade', (req, socket, head) => {
168 const xid = uniqid(); 161 const xid = uniqid();
169 162
170 const apikey = req.headers && (req.headers.apikey || req.headers.token); 163 const apikey = req.headers && (req.headers.apikey || req.headers.token);
171 164
172 const matchedApikey = isValidApikey( 165 const matchedApikey = isValidApikey(
173 apikey, 166 apikey,
174 config.push_trx_server && config.push_trx_server.apikey, 167 config.push_trx_server && config.push_trx_server.apikey,
175 ); 168 );
176 169
177 if (!matchedApikey) { 170 if (!matchedApikey) {
178 rejectConnection(req, socket); 171 rejectConnection(req, socket);
179 return; 172 return;
180 } 173 }
181 174
182 wss.handleUpgrade(req, socket, head, (ws) => { 175 wss.handleUpgrade(req, socket, head, (ws) => {
183 const client = { 176 const client = {
184 xid, 177 xid,
185 remoteAddress: req.socket.remoteAddress, 178 remoteAddress: req.socket.remoteAddress,
186 apikeyName: matchedApikey.name, 179 apikeyName: matchedApikey.name,
187 apikey, 180 apikey,
188 }; 181 };
189 182
190 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 183 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
191 xid, 184 xid,
192 headers: req.headers, 185 headers: req.headers,
193 client, 186 client,
194 }); 187 });
195 188
196 wss.emit('connection', ws, req, client); 189 wss.emit('connection', ws, req, client);
197 }); 190 });
198 }); 191 });
199 192
200 server.listen(wsListenPort, () => { 193 server.listen(wsListenPort, () => {
201 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 194 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
202 }); 195 });
203 196
204 // const heatbeatPingInterval = setInterval(() => { 197 // const heatbeatPingInterval = setInterval(() => {
205 // wss.clients.forEach((ws) => { 198 // wss.clients.forEach((ws) => {
206 // heartbeatPing(ws); 199 // heartbeatPing(ws);
207 // }); 200 // });
208 // }, PING_INTERVAL_MS); 201 // }, PING_INTERVAL_MS);
209 202
210 wss.on('close', () => { 203 wss.on('close', () => {
211 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 204 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
212 // clearInterval(heatbeatPingInterval); 205 // clearInterval(heatbeatPingInterval);
213 }); 206 });
207 };
208
209 if (!wsListenPort) {
210 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
211 config: {
212 push_trx_server: !!config && !!config.push_trx_server && {
213 port: !!config.push_trx_server.port,
214 listen_port: !!config.push_trx_server.listen_port,