From bf4d2336f3083f22609dc28dfbc30bb21bed93e9 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Thu, 4 Jul 2024 09:25:38 +0200 Subject: statusbot: ratelimit --- modules/statusbot/statusbot.js | 140 ++++++++++++++++++++++++++++++----------- 1 file changed, 104 insertions(+), 36 deletions(-) diff --git a/modules/statusbot/statusbot.js b/modules/statusbot/statusbot.js index 0fd9ce5..da81d82 100644 --- a/modules/statusbot/statusbot.js +++ b/modules/statusbot/statusbot.js @@ -11,7 +11,7 @@ // POST /statusbot // { "sender": "who are you", "text": "content of the status message" } -// State in this file: the above matrix.json plus: +// State in this file (gstate): the above matrix.json plus: // { // config: the above matrix.json, // home_server: "tomsmeding.com", // nominal server in the user id @@ -46,6 +46,8 @@ if (!persistTokens) { persist.setItemSync("access_tokens", persistTokens); } +let gstate = null; + // cb: (status: int, body: string) => () function fetch(method, headers, hostname, path, data, cb) { const req = https.request({method, headers, hostname, path}, res => { @@ -58,6 +60,56 @@ function fetch(method, headers, hostname, path, data, cb) { req.end(); } +// ms: int (milliseconds) +// f: ((...Args) => ()) => () +// cb: (true, ...Args) => () | (false) => () +// Callback is invoked with 'false' if the timeout expired, or with 'true' if 'f' finished before the timeout. +function ptimeout(ms, f, cb) { + let cbinvoked = false; + + const tmout = setTimeout(() => { + if (cbinvoked) return; + cbinvoked = true; + cb(false); + }, ms); + + f((...args) => { + if (cbinvoked) return; + cbinvoked = true; + clearTimeout(tmout); + cb(true, ...args); + }); +} + +class PRatelimit { + constructor(mindelayms) { + this._mindelayms = mindelayms; + this._queue = []; // [(() => ()) => ()] + this._ready = true; // true: should run job immediately; false: should enqueue + } + + // f: (() => ()) => () + submit(f) { + if (this._ready) { + this._ready = false; + f(() => this.startWaiting()); + } else { + this._queue.push(f); + } + } + + startWaiting() { + setTimeout(() => { + if (this._queue.length > 0) { + const f = this._queue.shift(); + f(() => this.startWaiting()); + } else { + this._ready = true; + } + }, this._mindelayms); + } +} + // cb: (state) => () function augmentConfig(config, cb) { const m = config.user_id.match(/^@[^:]+:(.*)$/); @@ -73,40 +125,43 @@ function augmentConfig(config, cb) { [access_token, req_counter] = persistTokens[pkey]; } - fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => { - if (status != 200) { - throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`); - } - const mserver = JSON.parse(body)["m.server"]; - const m = mserver.match(/^([^:]*)(:443)?$/); - if (!m) { - throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`); - } - const matrix_server = m[1]; - cb({config, home_server, matrix_server, access_token, req_counter}); - }); + // wait a bit with requesting this in case the homeserver in question is the same server as us + setTimeout(() => { + fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => { + if (status != 200) { + throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`); + } + const mserver = JSON.parse(body)["m.server"]; + const m = mserver.match(/^([^:]*)(:443)?$/); + if (!m) { + throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`); + } + const matrix_server = m[1]; + cb({config, home_server, matrix_server, access_token, req_counter}); + }); + }, 100); } -function updatePersist(state) { - persistTokens[`${state.config.user_id}|${state.config.room_id}`] = [state.access_token, state.req_counter]; +function updatePersist() { + persistTokens[`${gstate.config.user_id}|${gstate.config.room_id}`] = [gstate.access_token, gstate.req_counter]; persist.setItemSync("access_tokens", persistTokens); } // Sets access token in state. // cb: (success: bool, body: string) => () -function matrixLogin(state, cb) { +function matrixLogin(cb) { const data = JSON.stringify({ type: "m.login.password", - identifier: {type: "m.id.user", user: state.config.user_id}, - password: state.config.password, + identifier: {type: "m.id.user", user: gstate.config.user_id}, + password: gstate.config.password, }); - fetch("POST", {}, state.matrix_server, "/_matrix/client/v3/login", data, (status, body) => { + fetch("POST", {}, gstate.matrix_server, "/_matrix/client/v3/login", data, (status, body) => { if (status != 200) { cb(false, body); return; } try { const response = JSON.parse(body); - state.access_token = response.access_token; - state.req_counter = 1; - updatePersist(state); + gstate.access_token = response.access_token; + gstate.req_counter = 1; + updatePersist(); cb(true, body); } catch (e) { cb(false, body); @@ -118,20 +173,20 @@ function matrixLogin(state, cb) { // Status 401: access token invalid // Status 200: success // Anything else: ? (see body?) -function matrixSendMsg(state, text, cb) { - if (state.access_token == undefined) { cb(401); return; } +function matrixSendMsg(text, cb) { + if (gstate.access_token == undefined) { cb(401); return; } - const headers = {Authorization: `Bearer ${state.access_token}`}; - const url = `/_matrix/client/v3/rooms/${state.config.room_id}/send/m.room.message/${state.req_counter}`; - state.req_counter++; - updatePersist(state); // req_counter changed + const headers = {Authorization: `Bearer ${gstate.access_token}`}; + const url = `/_matrix/client/v3/rooms/${gstate.config.room_id}/send/m.room.message/${gstate.req_counter}`; + gstate.req_counter++; + updatePersist(); // req_counter changed const data = JSON.stringify({ msgtype: "m.text", body: text, }); - fetch("PUT", headers, state.matrix_server, url, data, (status, body) => { + fetch("PUT", headers, gstate.matrix_server, url, data, (status, body) => { cb(status, body); }); } @@ -146,21 +201,21 @@ function logFailure(message, cb) { // Tries to send a message, trying login if the access token is invalid. // cb: (success: bool) => () -function matrixSendMsgLogin(state, text, cb) { - matrixSendMsg(state, text, (status, body) => { +function matrixSendMsgLogin(text, cb) { + matrixSendMsg(text, (status, body) => { switch (status) { case 200: cb(true); break; case 401: - matrixLogin(state, (success, body) => { + matrixLogin((success, body) => { if (!success) { logFailure(`Failed to log in: ${body}`, () => cb(false)); return; } - matrixSendMsg(state, text, (status, body) => { + matrixSendMsg(text, (status, body) => { switch (status) { case 200: cb(true); return; case 401: logFailure(`401 even after login: ${body}`, () => cb(false)); break; @@ -183,14 +238,27 @@ module.exports = function(app, io, _moddir) { const config = require("./matrix.json"); const accounts = require("./accounts.json"); + const ratelimit = new PRatelimit(1000); + augmentConfig(config, state => { + gstate = state; + app.post("/statusbot", bodyParser.json(), cmn.authgen(accounts), (req, res) => { if (typeof req.body.sender != "string" || typeof req.body.text != "string") { return res.sendStatus(400); } - matrixSendMsgLogin(state, `[${req.body.sender}] ${req.body.text}`, success => { - if (success) res.sendStatus(200); - else res.sendStatus(503); // service unavailable + ratelimit.submit(rlcb => { + ptimeout(5000, + cb => matrixSendMsgLogin(`[${req.body.sender}] ${req.body.text}`, cb), + (finished, success) => { + if (!finished) { + res.sendStatus(504); // gateway timeout + logFailure(`Timed out on message: [${req.body.sender}] ${req.body.text}`, () => {}); + } else if (!success) res.sendStatus(503); // service unavailable + else res.sendStatus(200); + rlcb(); + } + ); }); }); }); -- cgit v1.2.3-70-g09d2