diff options
Diffstat (limited to 'modules/statusbot/statusbot.js')
-rw-r--r-- | modules/statusbot/statusbot.js | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/modules/statusbot/statusbot.js b/modules/statusbot/statusbot.js new file mode 100644 index 0000000..a518b17 --- /dev/null +++ b/modules/statusbot/statusbot.js @@ -0,0 +1,308 @@ +// Format of the required matrix.json file: +// { +// "user_id": "@statusbot:tomsmeding.com", +// "password": "PASSWORD OF THE USER", +// "room_id": "!ROOMID:tomsmeding.com" +// } + +// Furthermore, accounts.json is required in the standard format ([[user, pass]]) + +// API: +// POST /statusbot +// { "sender": "who are you", "text": "content of the status message" } + +// State in this file (gstate): the above matrix.json plus: +// { +// config: the above matrix.json, +// home_server: "tomsmeding.com", // nominal server in the user id +// matrix_server: "matrix.tomsmeding.com", // actual matrix server address, without port +// access_token: "some access token", +// req_counter: 1 // counter for transaction requests to the matrix server +// } + +// Saves persist state under key "access_tokens": +// { +// "@statusbot:tomsmeding.com|!ROOMID:tomsmeding.com": ["access token", req_counter: int] +// } + +const cmn = require("../$common.js"); +const fs = require("fs"); +const https = require("https"); +const URL = require("url"); +const bodyParser = require("body-parser"); + +let moddir = null; + +const persist = require("node-persist").create({ + dir: cmn.persistdir + "/statusbot", + continuous: false, + interval: false +}); +persist.initSync(); + +let persistTokens = persist.getItemSync("access_tokens"); +if (!persistTokens) { + persistTokens = {}; + persist.setItemSync("access_tokens", persistTokens); +} + +let gstate = null; + +// cb: (status: int, body: string) => () +function fetch(method, headers, hostname, path, data, cb) { + const req = https.request({method, headers, hostname, path}, res => { + let body = ""; + res.on("data", data => { body += data; }); + res.on("end", () => cb(res.statusCode, body)); + }); + req.on("error", () => cb(null, null)) + req.write(data); + req.end(); +} + +// ms: int (milliseconds) +// f: ((...Args) => ()) => () +// cb: (true, ...Args) => () | (false) => () +// Callback is invoked with 'false' if the timeout expired, or with 'true' if 'f' finished before the timeout. +function ptimeout(ms, f, cb) { + let cbinvoked = false; + + const tmout = setTimeout(() => { + if (cbinvoked) return; + cbinvoked = true; + cb(false); + }, ms); + + f((...args) => { + if (cbinvoked) return; + cbinvoked = true; + clearTimeout(tmout); + cb(true, ...args); + }); +} + +class PRatelimit { + constructor(mindelayms) { + this._mindelayms = mindelayms; + this._queue = []; // [(() => ()) => ()] + this._ready = true; // true: should run job immediately; false: should enqueue + } + + // f: (() => ()) => () + submit(f) { + if (this._ready) { + this._ready = false; + f(() => this.startWaiting()); + } else { + this._queue.push(f); + } + } + + startWaiting() { + setTimeout(() => { + if (this._queue.length > 0) { + const f = this._queue.shift(); + f(() => this.startWaiting()); + } else { + this._ready = true; + } + }, this._mindelayms); + } +} + +// f: ((true, ...Args) => () | (false) => ()) => () +// cb: (true, ...Args) => () | (false) => () +// The callback to f should be invoked with 'true' plus further arguments if it +// succeeded, and 'false' if it did not. +// Based on this info, if f failed, f is retried a few times. Once ntimes has +// been exhausted or a call succeeds, cb is invoked with respectively either +// (false) or (true, ...args). +function pretry(interval, multiplier, ntimes, f, cb) { + let tm = interval; + let ncalled = 0; + + function schedule() { + setTimeout(() => { + f((success, ...args) => { + if (success) { + cb(true, ...args); + return; + } + + ncalled++; + tm *= multiplier; + if (ncalled >= ntimes) cb(false); + else schedule(); + }); + }, tm); + } + + schedule(); +} + +// cb: (state) => () +function augmentConfig(config, cb) { + const m = config.user_id.match(/^@[^:]+:(.*)$/); + if (!m) { + throw new Error("statusbot: Cannot parse matrix ID (must be in @user:domain format)"); + } + const home_server = m[1]; + + const pkey = `${config.user_id}|${config.room_id}` + let access_token = undefined; + let req_counter = undefined; + if (pkey in persistTokens) { + [access_token, req_counter] = persistTokens[pkey]; + } + + // wait a bit with requesting this in case the homeserver in question is the same server as us + pretry(500, 1.2, 10, + cb => { + fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => { + if (status != 200) { + cb(false); // attempt failed (perhaps a later attempt will succeed) + } + const mserver = JSON.parse(body)["m.server"]; + const m = mserver.match(/^([^:]*)(:443)?$/); + if (!m) { + throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`); + } + const matrix_server = m[1]; + // attempt succeeded! + cb(true, {config, home_server, matrix_server, access_token, req_counter}); + }); + }, + (success, state) => { + if (success) cb(state); + else throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`); + } + ); +} + +function updatePersist() { + persistTokens[`${gstate.config.user_id}|${gstate.config.room_id}`] = [gstate.access_token, gstate.req_counter]; + persist.setItemSync("access_tokens", persistTokens); +} + +// Sets access token in state. +// cb: (success: bool, body: string) => () +function matrixLogin(cb) { + const data = JSON.stringify({ + type: "m.login.password", + identifier: {type: "m.id.user", user: gstate.config.user_id}, + password: gstate.config.password, + }); + fetch("POST", {}, gstate.matrix_server, "/_matrix/client/v3/login", data, (status, body) => { + if (status != 200) { cb(false, body); return; } + try { + const response = JSON.parse(body); + gstate.access_token = response.access_token; + gstate.req_counter = 1; + updatePersist(); + cb(true, body); + } catch (e) { + cb(false, body); + } + }); +} + +// cb: (status: int, body: string) => () +// Status 401: access token invalid +// Status 200: success +// Anything else: ? (see body?) +function matrixSendMsg(text, cb) { + if (gstate.access_token == undefined) { cb(401); return; } + + const headers = {Authorization: `Bearer ${gstate.access_token}`}; + const url = `/_matrix/client/v3/rooms/${gstate.config.room_id}/send/m.room.message/${gstate.req_counter}`; + gstate.req_counter++; + updatePersist(); // req_counter changed + + const data = JSON.stringify({ + msgtype: "m.text", + body: text, + }); + + fetch("PUT", headers, gstate.matrix_server, url, data, (status, body) => { + cb(status, body); + }); +} + +// cb: () => () +function logFailure(message, cb) { + fs.appendFile(moddir + "/faillog.txt", `[${new Date().toISOString()}] ${message}\n`, err => { + if (err) console.error(err); + cb(); + }); +} + +// Tries to send a message, trying login if the access token is invalid. +// cb: (success: bool) => () +function matrixSendMsgLogin(text, cb) { + matrixSendMsg(text, (status, body) => { + switch (status) { + case 200: + cb(true); + break; + + case 401: + matrixLogin((success, body) => { + if (!success) { + logFailure(`Failed to log in: ${body}`, () => cb(false)); + return; + } + + matrixSendMsg(text, (status, body) => { + switch (status) { + case 200: cb(true); return; + case 401: logFailure(`401 even after login: ${body}`, () => cb(false)); break; + default: logFailure(`Failed to send message: ${body}`, () => cb(false)); break; + } + }); + }); + break; + + default: + logFailure(`Failed to send message: ${body}`, () => cb(false)); + break; + } + }) +} + +module.exports = function(app, io, _moddir) { + moddir = _moddir; + + let config, accounts; + try { + config = require("./matrix.json"); + accounts = require("./accounts.json"); + } catch (e) { + console.error(e); + return false; + } + + const ratelimit = new PRatelimit(1000); + + augmentConfig(config, state => { + gstate = state; + + app.post("/statusbot", bodyParser.json(), cmn.authgen(accounts), (req, res) => { + if (typeof req.body.sender != "string" || typeof req.body.text != "string") { + return res.sendStatus(400); + } + ratelimit.submit(rlcb => { + ptimeout(5000, + cb => matrixSendMsgLogin(`[${req.body.sender}] ${req.body.text}`, cb), + (finished, success) => { + if (!finished) { + res.sendStatus(504); // gateway timeout + logFailure(`Timed out on message: [${req.body.sender}] ${req.body.text}`, () => {}); + } else if (!success) res.sendStatus(503); // service unavailable + else res.sendStatus(200); + rlcb(); + } + ); + }); + }); + }); +}; |