Commit 402ba9dec54f2259e1d1d0121c67e9d13615723b

Authored by Adhidarma Hadiwinoto
1 parent 93303acb84
Exists in master

resync db-mysql dari komodo

Showing 2 changed files with 111 additions and 18 deletions Inline Diff

1 'use strict'; 1 const HEALTHY_CHECK_INTERVAL_MS = 10000;
2
3 const MODULE_NAME = require('path').basename(__filename);
2 4
3 const mysql = require('mysql'); 5 const mysql = require('mysql');
4 6
5 const config = require('komodo-sdk/config'); 7 const config = require('komodo-sdk/config');
8 const logger = require('komodo-sdk/logger');
9
10 const connectionLimit = config.mysql && config.mysql.pool_connection_limit
11 ? config.mysql.pool_connection_limit
12 : 0;
6 13
7 const connectionLimit = config.mysql && config.mysql.pool_connection_limit ? config.mysql.pool_connection_limit : 0; 14 const ERROR_POOL_NOT_READY = new Error(`${MODULE_NAME}: pool is not ready`);
15 exports.ERROR_POOL_NOT_READY = ERROR_POOL_NOT_READY;
8 16
9 const pool = config.mysql ? mysql.createPool({ 17 const pool = config.mysql ? mysql.createPool({
10 connectionLimit: connectionLimit, 18 connectionLimit,
11 host: config.mysql.host || 'localhost', 19 host: config.mysql.host || 'localhost',
12 database: config.mysql.database || 'komodo', 20 database: config.mysql.database || 'komodo',
13 user: config.mysql.user || 'komodo', 21 user: config.mysql.user || 'komodo',
14 password: config.mysql.password 22 password: config.mysql.password,
23 timezone: config.mysql.timezone,
15 }) : null; 24 }) : null;
16 25
17 exports.pool = pool; 26 exports.pool = pool;
18 27
19 exports.format = (sql, values) => { 28 exports.query = (query, values, cb) => {
20 return new Promise((resolve, reject) => { 29 // pool.query.apply(null, arguments);
21 if (!pool) { 30 if (!pool || !pool.query) {
22 reject('Missing DB config'); 31 logger.warn(`${MODULE_NAME}: ${ERROR_POOL_NOT_READY.toString()}`);
32 if (typeof cb === 'function') {
33 cb(ERROR_POOL_NOT_READY);
34 }
35
36 return;
37 }
38
39 pool.query(query, values, cb);
40 };
41
42 exports.format = (sql, values, cb) => new Promise((resolve, reject) => {
43 if (!pool) {
44 reject(ERROR_POOL_NOT_READY);
45 if (typeof cb === 'function') cb(ERROR_POOL_NOT_READY);
46 return;
47 }
48
49 pool.getConnection((err, connection) => {
50 if (err) {
51 reject(err);
52 if (typeof cb === 'function') cb(err);
53 return;
54 }
55
56 const formatted = connection.format(sql, values);
57 connection.release();
58
59 resolve(formatted);
60 if (typeof cb === 'function') cb(null, formatted);
61 });
62 });
63
64 exports.beginConnection = (cb) => new Promise((resolve) => {
65 pool.getConnection((errGetConnection, connection) => {
66 if (errGetConnection) {
67 resolve([errGetConnection]);
68 if (typeof cb === 'function') cb(errGetConnection);
23 return; 69 return;
24 } 70 }
25 71
26 pool.getConnection((err, connection) => { 72 connection.beginTransaction((errBeginTransaction) => {
27 if (err) { 73 if (errBeginTransaction) {
28 reject(err); 74 resolve([errBeginTransaction]);
75 if (typeof cb === 'function') cb(errBeginTransaction);
29 return; 76 return;
30 } 77 }
31 78
32 resolve(connection.format(sql, values)); 79 resolve([null, connection]);
33 connection.release(); 80 if (typeof cb === 'function') cb(null, connection);
34 }); 81 });
35 })
36 }
82 });
83 });
84
85 exports.getBy = (tableName, fieldName, value, cb) => new Promise((resolve) => {
86 const query = 'SELECT * FROM ?? WHERE ?? = ? LIMIT 1';
87 const values = [tableName, fieldName, value];
88 pool.query(query, values, (err, results) => {
89 const result = results && results[0];
90 resolve([err, result || null]);
91 if (typeof cb === 'function') cb(err, result || null);
92 });
93 });
94
95 function healthyCheck() {
96 const query = 'SELECT 1';
97 const values = [];
98
99 if (!pool) {
100 logger.warn(`${MODULE_NAME}: Skip healthy check on undefined pool (ERR-EB9E5C08)`);
101 return;
102 }
103
104 if (!pool.query) {
105 logger.warn(`${MODULE_NAME}: Skip healthy check on undefined pool.query (ERR-D10F70F3)`);
106 return;
107 }
108
109 pool.query(query, values, (err) => {
110 if (err) {
111 logger.warn(`${MODULE_NAME}: Error on healthy check (ERR-38EC9B78)`, { err });
112 }
113 });
114 }
115
116 setInterval(() => {
117 const randomMs = Math.floor(Math.random() * HEALTHY_CHECK_INTERVAL_MS * 0.3);
118 setTimeout(() => {
119 try {
120 healthyCheck();
121 } catch (err) {
122 logger.warn(`${MODULE_NAME}: Exception on periodic healthy check (ERR-2D137502)`, { err });
123 }
124 }, randomMs);
lib/messages-archive.js
1 'use strict'; 1 'use strict';
2 2
3 const redis = require('redis'); 3 const redis = require('redis');
4 4
5 const config = require('komodo-sdk/config'); 5 const config = require('komodo-sdk/config');
6 const logger = require('komodo-sdk/logger'); 6 const logger = require('komodo-sdk/logger');
7 const db = require('./db-mysql'); 7 const db = require('./db-mysql');
8 8
9 const DIRECTION_INCOMING = 0; 9 const DIRECTION_INCOMING = 0;
10 const DIRECTION_OUTGOING = 1; 10 const DIRECTION_OUTGOING = 1;
11 11
12 let redisClient; 12 let redisClient;
13 13
14 if (!config.redis) { 14 if (!config.redis) {
15 logger.warn('Undefined config.redis, messages counter will not work! #05A778E21D7E'); 15 logger.warn('Undefined config.redis, messages counter will not work! #05A778E21D7E');
16 } else { 16 } else {
17 redisClient = redis.createClient(config.redis); 17 redisClient = redis.createClient(config.redis);
18 } 18 }
19 19
20 if (!redisClient) { 20 if (!redisClient) {
21 logger.warn('Undefined redisClient, messages counter will not work! #1D3EC165E8D9'); 21 logger.warn('Undefined redisClient, messages counter will not work! #1D3EC165E8D9');
22 } 22 }
23 23
24 function composeRedisCounterKeyword(origin, direction) { 24 function composeRedisCounterKeyword(origin, direction) {
25 const directionLabel = direction == DIRECTION_OUTGOING ? 'OUT' : 'IN'; 25 const directionLabel = direction == DIRECTION_OUTGOING ? 'OUT' : 'IN';
26 return `CALMA_MESSAGE_COUNTER_${origin}_${directionLabel}`; 26 return `CALMA_MESSAGE_COUNTER_${origin}_${directionLabel}`;
27 } 27 }
28 28
29 function incrementCounter(origin, direction) { 29 function incrementCounter(origin, direction) {
30 if (!redisClient) { 30 if (!redisClient) {
31 logger.warn('Undefined redisClient, not incrementing messages counter! #FF8E765E12E2'); 31 logger.warn('Undefined redisClient, not incrementing messages counter! #FF8E765E12E2');
32 } else { 32 } else {
33 redisClient.INCR(composeRedisCounterKeyword(origin, direction), () => {}); 33 redisClient.INCR(composeRedisCounterKeyword(origin, direction), () => {});
34 } 34 }
35 } 35 }
36 36
37 /** 37 /**
38 * Menyimpan pesan ke dalam archive histori pesan di database 38 * Menyimpan pesan ke dalam archive histori pesan di database
39 * 39 *
40 * @param {object} params - objek pesan yang akan disimpan 40 * @param {object} params - objek pesan yang akan disimpan
41 * @param {string} [params.origin_label] - label origin 41 * @param {string} [params.origin_label] - label origin
42 * @param {string} [params.origin=UNKNOWN] - digunakan sebagai label origin jika tdk ditentukan 42 * @param {string} [params.origin=UNKNOWN] - digunakan sebagai label origin jika tdk ditentukan
43 * @param {string} [params.origin_transport=UNKNOWN] - transport, misal SMS, TELEGRAM 43 * @param {string} [params.origin_transport=UNKNOWN] - transport, misal SMS, TELEGRAM
44 * @param {string} params.partner - pengirim / penerima 44 * @param {string} params.partner - pengirim / penerima
45 * @param {string} [params.msg] - isi pesan 45 * @param {string} [params.msg] - isi pesan
46 * @param {string} [params.message] - isi pesan, jika params.msg tidak terdefinisi 46 * @param {string} [params.message] - isi pesan, jika params.msg tidak terdefinisi
47 * @param {number} direction - 0: incoming, 1: outgoing 47 * @param {number} direction - 0: incoming, 1: outgoing
48 */ 48 */
49 function insert(params, direction) { 49 function insert(params, direction) {
50 incrementCounter( 50 incrementCounter(
51 params.origin_label || params.origin, 51 params.origin_label || params.origin,
52 direction 52 direction
53 ); 53 );
54 54
55 if (!db.pool) return; 55 if (!db.pool) {
56 logger.warn('MESSAGE-ARCHIVE: DB POOL is not ready to insert message history');
57 return;
58 }
56 59
57 const query = `INSERT INTO messages SET ?`; 60 const query = `INSERT INTO messages SET ?`;
58 const values = [{ 61 const values = [{
59 origin_label: (params.origin_label || params.origin || 'UNKNOWN').trim(), 62 origin_label: (params.origin_label || params.origin || 'UNKNOWN').trim(),
60 origin_transport: (params.origin_transport || 'UNKNOWN').trim(), 63 origin_transport: (params.origin_transport || 'UNKNOWN').trim(),
61 direction, 64 direction,
62 partner: params.partner.trim(), 65 partner: params.partner.trim(),
63 message: (params.msg || params.message).trim(), 66 message: (params.msg || params.message).trim(),
64 }]; 67 }];
65 68
66 db.pool.query(query, values, (err) => { 69 db.pool.query(query, values, async (err) => {
67 if (err) { 70 if (err) {
68 logger.warn(`MESSAGES-ARCHIVE: DB ERROR on inserting message. ${err.toString()}`); 71 const fullQuery = await db.format(query, values);
72 logger.warn(`MESSAGES-ARCHIVE: DB ERROR on inserting message. ${err.toString()}`, { query: fullQuery });
69 } 73 }
70 }); 74 });
71 } 75 }
72 76
73 exports.insert = insert; 77 exports.insert = insert;
74 exports.DIRECTION_INCOMING = DIRECTION_INCOMING; 78 exports.DIRECTION_INCOMING = DIRECTION_INCOMING;
75 exports.DIRECTION_OUTGOING = DIRECTION_OUTGOING; 79 exports.DIRECTION_OUTGOING = DIRECTION_OUTGOING;