// Format of the required matrix.json file: // { // "user_id": "@statusbot:tomsmeding.com", // "password": "PASSWORD OF THE USER", // "room_id": "!ROOMID:tomsmeding.com" // } // Furthermore, accounts.json is required in the standard format ([[user, pass]]) // API: // POST /statusbot // { "sender": "who are you", "text": "content of the status message" } // State in this file (gstate): the above matrix.json plus: // { // config: the above matrix.json, // home_server: "tomsmeding.com", // nominal server in the user id // matrix_server: "matrix.tomsmeding.com", // actual matrix server address, without port // access_token: "some access token", // req_counter: 1 // counter for transaction requests to the matrix server // } // Saves persist state under key "access_tokens": // { // "@statusbot:tomsmeding.com|!ROOMID:tomsmeding.com": ["access token", req_counter: int] // } const cmn = require("../$common.js"); const fs = require("fs"); const https = require("https"); const URL = require("url"); const bodyParser = require("body-parser"); let moddir = null; const persist = require("node-persist").create({ dir: cmn.persistdir + "/statusbot", continuous: false, interval: false }); persist.initSync(); let persistTokens = persist.getItemSync("access_tokens"); if (!persistTokens) { persistTokens = {}; persist.setItemSync("access_tokens", persistTokens); } let gstate = null; // cb: (status: int, body: string) => () function fetch(method, headers, hostname, path, data, cb) { const req = https.request({method, headers, hostname, path}, res => { let body = ""; res.on("data", data => { body += data; }); res.on("end", () => cb(res.statusCode, body)); }); req.on("error", () => cb(null, null)) req.write(data); req.end(); } // ms: int (milliseconds) // f: ((...Args) => ()) => () // cb: (true, ...Args) => () | (false) => () // Callback is invoked with 'false' if the timeout expired, or with 'true' if 'f' finished before the timeout. function ptimeout(ms, f, cb) { let cbinvoked = false; const tmout = setTimeout(() => { if (cbinvoked) return; cbinvoked = true; cb(false); }, ms); f((...args) => { if (cbinvoked) return; cbinvoked = true; clearTimeout(tmout); cb(true, ...args); }); } class PRatelimit { constructor(mindelayms) { this._mindelayms = mindelayms; this._queue = []; // [(() => ()) => ()] this._ready = true; // true: should run job immediately; false: should enqueue } // f: (() => ()) => () submit(f) { if (this._ready) { this._ready = false; f(() => this.startWaiting()); } else { this._queue.push(f); } } startWaiting() { setTimeout(() => { if (this._queue.length > 0) { const f = this._queue.shift(); f(() => this.startWaiting()); } else { this._ready = true; } }, this._mindelayms); } } // f: ((true, ...Args) => () | (false) => ()) => () // cb: (true, ...Args) => () | (false) => () // The callback to f should be invoked with 'true' plus further arguments if it // succeeded, and 'false' if it did not. // Based on this info, if f failed, f is retried a few times. Once ntimes has // been exhausted or a call succeeds, cb is invoked with respectively either // (false) or (true, ...args). function pretry(interval, multiplier, ntimes, f, cb) { let tm = interval; let ncalled = 0; function schedule() { setTimeout(() => { f((success, ...args) => { if (success) { cb(true, ...args); return; } ncalled++; tm *= multiplier; if (ncalled >= ntimes) cb(false); else schedule(); }); }, tm); } schedule(); } // cb: (state) => () function augmentConfig(config, cb) { const m = config.user_id.match(/^@[^:]+:(.*)$/); if (!m) { throw new Error("statusbot: Cannot parse matrix ID (must be in @user:domain format)"); } const home_server = m[1]; const pkey = `${config.user_id}|${config.room_id}` let access_token = undefined; let req_counter = undefined; if (pkey in persistTokens) { [access_token, req_counter] = persistTokens[pkey]; } // wait a bit with requesting this in case the homeserver in question is the same server as us pretry(500, 1.4, 10, cb => { console.log("statusbot: Attempting to get matrix server info"); fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => { if (status != 200) { cb(false); // attempt failed (perhaps a later attempt will succeed) return; } let mserver; try { mserver = JSON.parse(body)["m.server"]; } catch (e) { cb(false); // invalid JSON response, try again later return; } const m = mserver.match(/^([^:]*)(:443)?$/); if (!m) { throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`); } const matrix_server = m[1]; // attempt succeeded! cb(true, {config, home_server, matrix_server, access_token, req_counter}); }); }, (success, state) => { if (success) { console.log("statusbot: Successfully got matrix server info."); cb(state); } else throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`); } ); } function updatePersist() { persistTokens[`${gstate.config.user_id}|${gstate.config.room_id}`] = [gstate.access_token, gstate.req_counter]; persist.setItemSync("access_tokens", persistTokens); } // Sets access token in state. // cb: (success: bool, body: string) => () function matrixLogin(cb) { const data = JSON.stringify({ type: "m.login.password", identifier: {type: "m.id.user", user: gstate.config.user_id}, password: gstate.config.password, }); fetch("POST", {}, gstate.matrix_server, "/_matrix/client/v3/login", data, (status, body) => { if (status != 200) { cb(false, body); return; } try { const response = JSON.parse(body); gstate.access_token = response.access_token; gstate.req_counter = 1; updatePersist(); cb(true, body); } catch (e) { cb(false, body); } }); } // cb: (status: int, body: string) => () // Status 401: access token invalid // Status 200: success // Anything else: ? (see body?) function matrixSendMsg(text, cb) { if (gstate.access_token == undefined) { cb(401); return; } const headers = {Authorization: `Bearer ${gstate.access_token}`}; const url = `/_matrix/client/v3/rooms/${gstate.config.room_id}/send/m.room.message/${gstate.req_counter}`; gstate.req_counter++; updatePersist(); // req_counter changed const data = JSON.stringify({ msgtype: "m.text", body: text, }); fetch("PUT", headers, gstate.matrix_server, url, data, (status, body) => { cb(status, body); }); } // cb: () => () function logFailure(message, cb) { fs.appendFile(moddir + "/faillog.txt", `[${new Date().toISOString()}] ${message}\n`, err => { if (err) console.error(err); cb(); }); } // Tries to send a message, trying login if the access token is invalid. // cb: (success: bool) => () function matrixSendMsgLogin(text, cb) { matrixSendMsg(text, (status, body) => { switch (status) { case 200: cb(true); break; case 401: matrixLogin((success, body) => { if (!success) { logFailure(`Failed to log in: ${body}`, () => cb(false)); return; } matrixSendMsg(text, (status, body) => { switch (status) { case 200: cb(true); return; case 401: logFailure(`401 even after login: ${body}`, () => cb(false)); break; default: logFailure(`Failed to send message: ${body}`, () => cb(false)); break; } }); }); break; default: logFailure(`Failed to send message: ${body}`, () => cb(false)); break; } }) } module.exports = function(app, io, _moddir) { moddir = _moddir; let config, accounts; try { config = require("./matrix.json"); accounts = require("./accounts.json"); } catch (e) { console.error(e); return false; } const ratelimit = new PRatelimit(1000); augmentConfig(config, state => { gstate = state; app.post("/statusbot", bodyParser.json(), cmn.authgen(accounts), (req, res) => { if (typeof req.body.sender != "string" || typeof req.body.text != "string") { return res.sendStatus(400); } ratelimit.submit(rlcb => { ptimeout(5000, cb => matrixSendMsgLogin(`[${req.body.sender}] ${req.body.text}`, cb), (finished, success) => { if (!finished) { res.sendStatus(504); // gateway timeout logFailure(`Timed out on message: [${req.body.sender}] ${req.body.text}`, () => {}); } else if (!success) res.sendStatus(503); // service unavailable else res.sendStatus(200); rlcb(); } ); }); }); }); };