message-archive.js 3.32 KB
"use strict";

const fs = require('fs');
const strftime = require('strftime');
const sqlite3 = require('sqlite3').verbose();
const cron = require("cron");

const config = require("./config");
const logger = require("./logger");

const gc_interval_ms = (config.gc_interval_secs || 24 * 3600 ) * 1000;

const dbFile = process.cwd() + "/archive.sqlite";
const db = new sqlite3.Database(dbFile);

var ready = false;

function isReady() {
    return ready;
}

function create() {
    db.serialize(function() {
        db.run("CREATE TABLE IF NOT EXISTS trx (trx_date TEXT, user_id TEXT, ref_id TEXT)");
        db.run("CREATE INDEX IF NOT EXISTS trx_date_idx ON trx(trx_date)");
        db.run("CREATE UNIQUE INDEX IF NOT EXISTS trx_userid_refid_idx ON trx(user_id, ref_id)");
        gc();
        db.run('VACUUM');
        ready = true;
        logger.info("Message archive db is ready");
    })
}


function insert(command, cb) {
    if (cb && !command.user_id) {
        cb('ER_NO_USER_ID');
        return;
    }

    if (cb && !command.ref_id) {
        cb('ER_NO_REF_ID');
        return;
    }

    const params = {$trx_date: strftime('%Y-%m-%d'), $user_id: command.user_id.toLowerCase(), $ref_id: command.ref_id};
    db.run("INSERT INTO trx VALUES($trx_date, $user_id, $ref_id)", params, function(err) {
        if (err) {
            logger.warn('Error archiving trx', {err: err, params: params, command: command});
        }
        else {
            logger.verbose('Trx archived successfully', {command: command});
        }

        if (cb) {
            cb(err);
        }
    })
}

function find(command, cb) {
    const params = {$user_id: command.user_id.toLowerCase(), $ref_id: command.ref_id};
    db.get("SELECT rowid FROM trx WHERE user_id = $user_id AND ref_id = $ref_id", params, function(err, row) {
        if (err) {
            logger.warn("ERROR find on archive db", {err: err});
            cb(err);
            return;
        }

        if (row) {
            logger.verbose("Got match on archive db", {command: command, row: row});
        } else {
            logger.verbose("No match on archive db", {command: command});
        }

        cb(null, row);
    });
}

function findByRefId(ref_id, cb) {
    const params = {$ref_id: ref_id};
    db.all("SELECT * FROM trx WHERE ref_id = $ref_id", params, function(err, rows) {
        if (err) {
            logger.warn("ERROR find on archive db", {err: err});
            cb(err);
            return;
        }

        cb(null, rows);
    });
}

function remove(user_id, ref_id) {
    const params = {$user_id: user_id.toLowerCase(), $ref_id: ref_id};

    logger.verbose("Deleting trx from archive", {user_id: user_id, ref_id: ref_id});
    db.run("DELETE FROM trx WHERE user_id = $user_id AND ref_id = $ref_id", params, function(err) {
        if (err) {
            logger.warn("ERROR on delete archive trx", {err: err});
        }
    });
}

function gc() {
    logger.verbose("Executing message archive garbage collection");
    db.run("DELETE FROM trx WHERE julianday('now') - julianday(trx_date) > 3");
}

create();

//setInterval(gc, gc_interval_ms);

var gcJob = new cron.CronJob({
    cronTime: '0 0 * * *',
    onTick: function() {
        gc();
    },
    start: true
});

exports.insert = insert;
exports.find = find;
exports.findByRefId = findByRefId;
exports.remove = remove;
exports.isReady = isReady;