message-archive.js
3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"use strict";
const fs = require('fs');
const strftime = require('strftime');
const sqlite3 = require('sqlite3').verbose();
const cron = require("cron");
const config = require("./config");
const logger = require("./logger");
const gc_interval_ms = (config.gc_interval_secs || 24 * 3600 ) * 1000;
const dbFile = process.cwd() + "/archive.sqlite";
const db = new sqlite3.Database(dbFile);
var ready = false;
function isReady() {
return ready;
}
function create() {
db.serialize(function() {
db.run("CREATE TABLE IF NOT EXISTS trx (trx_date TEXT, user_id TEXT, ref_id TEXT)");
db.run("CREATE INDEX IF NOT EXISTS trx_date_idx ON trx(trx_date)");
db.run("CREATE UNIQUE INDEX IF NOT EXISTS trx_userid_refid_idx ON trx(user_id, ref_id)");
gc();
db.run('VACUUM');
ready = true;
logger.info("Message archive db is ready");
})
}
function insert(command, cb) {
if (cb && !command.user_id) {
cb('ER_NO_USER_ID');
return;
}
if (cb && !command.ref_id) {
cb('ER_NO_REF_ID');
return;
}
const params = {$trx_date: strftime('%Y-%m-%d'), $user_id: command.user_id.toLowerCase(), $ref_id: command.ref_id};
db.run("INSERT INTO trx VALUES($trx_date, $user_id, $ref_id)", params, function(err) {
if (err) {
logger.warn('Error archiving trx', {err: err, params: params, command: command});
}
else {
logger.verbose('Trx archived successfully', {command: command});
}
if (cb) {
cb(err);
}
})
}
function find(command, cb) {
const params = {$user_id: command.user_id.toLowerCase(), $ref_id: command.ref_id};
db.get("SELECT rowid FROM trx WHERE user_id = $user_id AND ref_id = $ref_id", params, function(err, row) {
if (err) {
logger.warn("ERROR find on archive db", {err: err});
cb(err);
return;
}
if (row) {
logger.verbose("Got match on archive db", {command: command, row: row});
} else {
logger.verbose("No match on archive db", {command: command});
}
cb(null, row);
});
}
function findByRefId(ref_id, cb) {
const params = {$ref_id: ref_id};
db.all("SELECT * FROM trx WHERE ref_id = $ref_id", params, function(err, rows) {
if (err) {
logger.warn("ERROR find on archive db", {err: err});
cb(err);
return;
}
cb(null, rows);
});
}
function remove(user_id, ref_id) {
const params = {$user_id: user_id.toLowerCase(), $ref_id: ref_id};
logger.verbose("Deleting trx from archive", {user_id: user_id, ref_id: ref_id});
db.run("DELETE FROM trx WHERE user_id = $user_id AND ref_id = $ref_id", params, function(err) {
if (err) {
logger.warn("ERROR on delete archive trx", {err: err});
}
});
}
function gc() {
logger.verbose("Executing message archive garbage collection");
db.run("DELETE FROM trx WHERE julianday('now') - julianday(trx_date) > 3");
}
create();
//setInterval(gc, gc_interval_ms);
var gcJob = new cron.CronJob({
cronTime: '0 0 * * *',
onTick: function() {
gc();
},
start: true
});
exports.insert = insert;
exports.find = find;
exports.findByRefId = findByRefId;
exports.remove = remove;
exports.isReady = isReady;