Commit c08770406a7ed510b98315f8a720637bedd7cb32

Authored by Adhidarma Hadiwinoto
1 parent 7ca0e79023
Exists in master

Add push-trx-sdk-version attribute

Showing 1 changed file with 11 additions and 2 deletions Inline Diff

1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER'; 1 const MODULE_NAME = 'KOMODO-SDK-PUSH-TRX.SERVER';
2 2
3 const config = require('komodo-sdk/config'); 3 const config = require('komodo-sdk/config');
4 const logger = require('tektrans-logger'); 4 const logger = require('tektrans-logger');
5 5
6 const { createServer } = require('http'); 6 const { createServer } = require('http');
7 const { WebSocketServer } = require('ws'); 7 const { WebSocketServer } = require('ws');
8 const uniqid = require('uniqid'); 8 const uniqid = require('uniqid');
9 9
10 const packageJson = require('./package.json');
11
10 const defaultValues = require('./lib/default-values'); 12 const defaultValues = require('./lib/default-values');
11 const isValidApikey = require('./lib/is-valid-apikey'); 13 const isValidApikey = require('./lib/is-valid-apikey');
12 const rejectConnection = require('./lib/server/reject-connection'); 14 const rejectConnection = require('./lib/server/reject-connection');
13 const heartbeatPing = require('./lib/heartbeat-ping'); 15 const heartbeatPing = require('./lib/heartbeat-ping');
14 const onPing = require('./lib/on-ping'); 16 const onPing = require('./lib/on-ping');
15 const onPong = require('./lib/on-pong'); 17 const onPong = require('./lib/on-pong');
16 18
17 const PING_INTERVAL_MS = defaultValues.pingIntervalMs; 19 const PING_INTERVAL_MS = defaultValues.pingIntervalMs;
18 20
19 const server = createServer(); 21 const server = createServer();
20 22
21 let partner; 23 let partner;
22 24
23 exports.setPartner = (partnerFromCaller) => { 25 exports.setPartner = (partnerFromCaller) => {
24 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`); 26 logger.verbose(`${MODULE_NAME} D56C3427: Partner set`);
25 partner = partnerFromCaller; 27 partner = partnerFromCaller;
26 }; 28 };
27 29
28 const wsListenPort = config.push_trx_server 30 const wsListenPort = config.push_trx_server
29 && (config.push_trx_server.port || config.push_trx_server.listen_port); 31 && (config.push_trx_server.port || config.push_trx_server.listen_port);
30 32
31 const initServer = () => { 33 const initServer = () => {
32 logger.verbose(`${MODULE_NAME} 70D208B2: Initializing`); 34 const pushTrxSdkVersion = packageJson.version;
35 logger.verbose(`${MODULE_NAME} 70D208B2: Initializing`, { pushTrxSdkVersion });
33 36
34 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true }); 37 const wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
35 38
36 wss.on('connection', (ws, req, client) => { 39 wss.on('connection', (ws, req, client) => {
37 // eslint-disable-next-line no-param-reassign 40 // eslint-disable-next-line no-param-reassign
38 ws.isAlive = true; 41 ws.isAlive = true;
39 42
40 const { 43 const {
41 xid: connectionXid, 44 xid: connectionXid,
42 remoteAddress, 45 remoteAddress,
43 apikey, 46 apikey,
44 apikeyName, 47 apikeyName,
45 } = client; 48 } = client;
46 49
47 logger.info(`${MODULE_NAME} F7755A03: Client connected`, { 50 logger.info(`${MODULE_NAME} F7755A03: Client connected`, {
48 xid: connectionXid || uniqid(), 51 xid: connectionXid || uniqid(),
49 client, 52 client,
50 }); 53 });
51 54
52 const heatbeatPingInterval = setInterval(() => { 55 const heatbeatPingInterval = setInterval(() => {
53 heartbeatPing(ws); 56 heartbeatPing(ws);
54 }, PING_INTERVAL_MS); 57 }, PING_INTERVAL_MS);
55 58
56 ws.on('ping', (data) => onPing(ws, data)); 59 ws.on('ping', (data) => onPing(ws, data));
57 ws.on('pong', (data) => onPong(ws, data)); 60 ws.on('pong', (data) => onPong(ws, data));
58 61
59 ws.on('close', (code, reason) => { 62 ws.on('close', (code, reason) => {
60 const xid = uniqid(); 63 const xid = uniqid();
61 64
62 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, { 65 logger.verbose(`${MODULE_NAME} AB48F13F: Client disconnected`, {
63 xid, 66 xid,
64 remoteAddress, 67 remoteAddress,
65 apikeyName, 68 apikeyName,
66 apikey, 69 apikey,
67 code, 70 code,
68 reason: reason.toString(), 71 reason: reason.toString(),
69 }); 72 });
70 73
71 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, { 74 logger.verbose(`${MODULE_NAME} 151CE8C0: Clearing ping interval`, {
72 xid, 75 xid,
73 }); 76 });
74 clearInterval(heatbeatPingInterval); 77 clearInterval(heatbeatPingInterval);
75 }); 78 });
76 79
77 ws.on('message', (data) => { 80 ws.on('message', (data) => {
78 const xid = uniqid(); 81 const xid = uniqid();
79 82
80 // eslint-disable-next-line no-param-reassign 83 // eslint-disable-next-line no-param-reassign
81 ws.isAlive = true; 84 ws.isAlive = true;
82 85
83 let msg; 86 let msg;
84 try { 87 try {
85 msg = JSON.parse(data.toString() || ''); 88 msg = JSON.parse(data.toString() || '');
86 } catch (e) { 89 } catch (e) {
87 msg = data.toString(); 90 msg = data.toString();
88 } 91 }
89 92
90 if (!msg) return; 93 if (!msg) return;
91 94
92 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase(); 95 const msgType = ((msg && (msg.type || msg.msgType)) || '').toUpperCase();
93 96
94 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, { 97 logger.verbose(`${MODULE_NAME} 72D2A702: Got a message`, {
95 xid, 98 xid,
96 apikeyName, 99 apikeyName,
97 // apikey, 100 // apikey,
98 msgType, 101 msgType,
99 msg, 102 msg,
100 msgSize: data.toString().length, 103 msgSize: data.toString().length,
101 }); 104 });
102 105
103 if (msgType === 'TASK') { 106 if (msgType === 'TASK') {
104 const task = msg.payload; 107 const task = msg.payload;
105 if (!task) { 108 if (!task) {
106 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg }); 109 logger.verbose(`${MODULE_NAME} 79D6A683: Missing TASK data`, { xid, msg });
107 return; 110 return;
108 } 111 }
109 112
110 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, { 113 logger.verbose(`${MODULE_NAME} 7FFFF91D: Got a task`, {
111 xid, 114 xid,
112 task, 115 task,
113 }); 116 });
114 117
115 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder); 118 const sdkTrxIdAdder = Number(config.sdk_trx_id_adder);
116 if (sdkTrxIdAdder) { 119 if (sdkTrxIdAdder) {
117 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder; 120 const newTrxId = Number(task.trx_id) + sdkTrxIdAdder;
118 121
119 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, { 122 logger.verbose(`${MODULE_NAME} 6E6E200D: Fix trx_id because of config.sdk_trx_id_adder`, {
120 xid, 123 xid,
121 sdkTrxIdAdder, 124 sdkTrxIdAdder,
122 originalTrxId: task.trx_id, 125 originalTrxId: task.trx_id,
123 newTrxId, 126 newTrxId,
124 }); 127 });
125 128
126 task.trx_id = newTrxId; 129 task.trx_id = newTrxId;
127 } 130 }
128 131
129 const replyMessage = { 132 const replyMessage = {
130 type: 'ACK', 133 type: 'ACK',
131 rid: msg.rid || null, 134 rid: msg.rid || null,
132 ts: new Date(), 135 ts: new Date(),
133 error: false, 136 error: false,
134 }; 137 };
135 138
136 ws.send(JSON.stringify(replyMessage), { compress: true }); 139 ws.send(JSON.stringify(replyMessage), { compress: true });
137 140
138 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) { 141 if (!partner || !partner.buy || (typeof partner.buy !== 'function')) {
139 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid }); 142 logger.verbose(`${MODULE_NAME} C2136EC3: Missing partner handler`, { xid });
140 return; 143 return;
141 } 144 }
142 145
143 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, { 146 logger.verbose(`${MODULE_NAME} 3D870327: Forwarding task to partner module`, {
144 xid, 147 xid,
145 }); 148 });
146 149
147 partner.buy(task, xid); 150 partner.buy(task, xid);
148 } else { 151 } else {
149 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, { 152 logger.verbose(`${MODULE_NAME} CF88DC54: Unknown message type`, {
150 xid, 153 xid,
151 msgType, 154 msgType,
152 msg, 155 msg,
153 }); 156 });
154 } 157 }
155 }); 158 });
156 159
157 ws.send(JSON.stringify({ msgType: 'WELCOMEMSG', data: { msg: 'bla bla bla' } })); 160 ws.send(JSON.stringify({
161 msgType: 'WELCOMEMSG',
162 data: {
163 msg: 'bla bla bla',
164 pushTrxSdkVersion,
165 },
166 }));
158 }); 167 });
159 168
160 server.on('upgrade', (req, socket, head) => { 169 server.on('upgrade', (req, socket, head) => {
161 const xid = uniqid(); 170 const xid = uniqid();
162 171
163 const apikey = req.headers && (req.headers.apikey || req.headers.token); 172 const apikey = req.headers && (req.headers.apikey || req.headers.token);
164 173
165 const matchedApikey = isValidApikey( 174 const matchedApikey = isValidApikey(
166 apikey, 175 apikey,
167 config.push_trx_server && config.push_trx_server.apikey, 176 config.push_trx_server && config.push_trx_server.apikey,
168 ); 177 );
169 178
170 if (!matchedApikey) { 179 if (!matchedApikey) {
171 rejectConnection(req, socket); 180 rejectConnection(req, socket);
172 return; 181 return;
173 } 182 }
174 183
175 wss.handleUpgrade(req, socket, head, (ws) => { 184 wss.handleUpgrade(req, socket, head, (ws) => {
176 const client = { 185 const client = {
177 xid, 186 xid,
178 remoteAddress: req.socket.remoteAddress, 187 remoteAddress: req.socket.remoteAddress,
179 apikeyName: matchedApikey.name, 188 apikeyName: matchedApikey.name,
180 apikey, 189 apikey,
181 }; 190 };
182 191
183 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, { 192 logger.verbose(`${MODULE_NAME} AB8BC2F4: Incoming HTTP connection`, {
184 xid, 193 xid,
185 headers: req.headers, 194 headers: req.headers,
186 client, 195 client,
187 }); 196 });
188 197
189 wss.emit('connection', ws, req, client); 198 wss.emit('connection', ws, req, client);
190 }); 199 });
191 }); 200 });
192 201
193 server.listen(wsListenPort, () => { 202 server.listen(wsListenPort, () => {
194 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort }); 203 logger.info(`${MODULE_NAME} 367D7DB6: Listening websocket`, { port: wsListenPort });
195 }); 204 });
196 205
197 // const heatbeatPingInterval = setInterval(() => { 206 // const heatbeatPingInterval = setInterval(() => {
198 // wss.clients.forEach((ws) => { 207 // wss.clients.forEach((ws) => {
199 // heartbeatPing(ws); 208 // heartbeatPing(ws);
200 // }); 209 // });
201 // }, PING_INTERVAL_MS); 210 // }, PING_INTERVAL_MS);
202 211
203 wss.on('close', () => { 212 wss.on('close', () => {
204 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`); 213 logger.verbose(`${MODULE_NAME} E973112F: WebSocketServer closed`);
205 // clearInterval(heatbeatPingInterval); 214 // clearInterval(heatbeatPingInterval);
206 }); 215 });
207 }; 216 };
208 217
209 if (!wsListenPort) { 218 if (!wsListenPort) {
210 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, { 219 logger.info(`${MODULE_NAME} 58ACCE20: Disabling PUSH_TRX_SERVER`, {
211 config: { 220 config: {
212 push_trx_server: !!config && !!config.push_trx_server && { 221 push_trx_server: !!config && !!config.push_trx_server && {
213 port: !!config.push_trx_server.port, 222 port: !!config.push_trx_server.port,
214 listen_port: !!config.push_trx_server.listen_port, 223 listen_port: !!config.push_trx_server.listen_port,
215 }, 224 },
216 }, 225 },
217 }); 226 });
218 } else { 227 } else {
219 initServer(); 228 initServer();
220 } 229 }
221 230