summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/statusbot/statusbot.js140
1 files changed, 104 insertions, 36 deletions
diff --git a/modules/statusbot/statusbot.js b/modules/statusbot/statusbot.js
index 0fd9ce5..da81d82 100644
--- a/modules/statusbot/statusbot.js
+++ b/modules/statusbot/statusbot.js
@@ -11,7 +11,7 @@
// POST /statusbot
// { "sender": "who are you", "text": "content of the status message" }
-// State in this file: the above matrix.json plus:
+// State in this file (gstate): the above matrix.json plus:
// {
// config: the above matrix.json,
// home_server: "tomsmeding.com", // nominal server in the user id
@@ -46,6 +46,8 @@ if (!persistTokens) {
persist.setItemSync("access_tokens", persistTokens);
}
+let gstate = null;
+
// cb: (status: int, body: string) => ()
function fetch(method, headers, hostname, path, data, cb) {
const req = https.request({method, headers, hostname, path}, res => {
@@ -58,6 +60,56 @@ function fetch(method, headers, hostname, path, data, cb) {
req.end();
}
+// ms: int (milliseconds)
+// f: ((...Args) => ()) => ()
+// cb: (true, ...Args) => () | (false) => ()
+// Callback is invoked with 'false' if the timeout expired, or with 'true' if 'f' finished before the timeout.
+function ptimeout(ms, f, cb) {
+ let cbinvoked = false;
+
+ const tmout = setTimeout(() => {
+ if (cbinvoked) return;
+ cbinvoked = true;
+ cb(false);
+ }, ms);
+
+ f((...args) => {
+ if (cbinvoked) return;
+ cbinvoked = true;
+ clearTimeout(tmout);
+ cb(true, ...args);
+ });
+}
+
+class PRatelimit {
+ constructor(mindelayms) {
+ this._mindelayms = mindelayms;
+ this._queue = []; // [(() => ()) => ()]
+ this._ready = true; // true: should run job immediately; false: should enqueue
+ }
+
+ // f: (() => ()) => ()
+ submit(f) {
+ if (this._ready) {
+ this._ready = false;
+ f(() => this.startWaiting());
+ } else {
+ this._queue.push(f);
+ }
+ }
+
+ startWaiting() {
+ setTimeout(() => {
+ if (this._queue.length > 0) {
+ const f = this._queue.shift();
+ f(() => this.startWaiting());
+ } else {
+ this._ready = true;
+ }
+ }, this._mindelayms);
+ }
+}
+
// cb: (state) => ()
function augmentConfig(config, cb) {
const m = config.user_id.match(/^@[^:]+:(.*)$/);
@@ -73,40 +125,43 @@ function augmentConfig(config, cb) {
[access_token, req_counter] = persistTokens[pkey];
}
- fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => {
- if (status != 200) {
- throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`);
- }
- const mserver = JSON.parse(body)["m.server"];
- const m = mserver.match(/^([^:]*)(:443)?$/);
- if (!m) {
- throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`);
- }
- const matrix_server = m[1];
- cb({config, home_server, matrix_server, access_token, req_counter});
- });
+ // wait a bit with requesting this in case the homeserver in question is the same server as us
+ setTimeout(() => {
+ fetch("GET", {}, home_server, "/.well-known/matrix/server", "", (status, body) => {
+ if (status != 200) {
+ throw new Error(`statusbot: Failed getting https://${home_server}/.well-known/matrix/server`);
+ }
+ const mserver = JSON.parse(body)["m.server"];
+ const m = mserver.match(/^([^:]*)(:443)?$/);
+ if (!m) {
+ throw new Error(`statusbot: Matrix server port not 443 (sorry): <${mserver}>`);
+ }
+ const matrix_server = m[1];
+ cb({config, home_server, matrix_server, access_token, req_counter});
+ });
+ }, 100);
}
-function updatePersist(state) {
- persistTokens[`${state.config.user_id}|${state.config.room_id}`] = [state.access_token, state.req_counter];
+function updatePersist() {
+ persistTokens[`${gstate.config.user_id}|${gstate.config.room_id}`] = [gstate.access_token, gstate.req_counter];
persist.setItemSync("access_tokens", persistTokens);
}
// Sets access token in state.
// cb: (success: bool, body: string) => ()
-function matrixLogin(state, cb) {
+function matrixLogin(cb) {
const data = JSON.stringify({
type: "m.login.password",
- identifier: {type: "m.id.user", user: state.config.user_id},
- password: state.config.password,
+ identifier: {type: "m.id.user", user: gstate.config.user_id},
+ password: gstate.config.password,
});
- fetch("POST", {}, state.matrix_server, "/_matrix/client/v3/login", data, (status, body) => {
+ fetch("POST", {}, gstate.matrix_server, "/_matrix/client/v3/login", data, (status, body) => {
if (status != 200) { cb(false, body); return; }
try {
const response = JSON.parse(body);
- state.access_token = response.access_token;
- state.req_counter = 1;
- updatePersist(state);
+ gstate.access_token = response.access_token;
+ gstate.req_counter = 1;
+ updatePersist();
cb(true, body);
} catch (e) {
cb(false, body);
@@ -118,20 +173,20 @@ function matrixLogin(state, cb) {
// Status 401: access token invalid
// Status 200: success
// Anything else: ? (see body?)
-function matrixSendMsg(state, text, cb) {
- if (state.access_token == undefined) { cb(401); return; }
+function matrixSendMsg(text, cb) {
+ if (gstate.access_token == undefined) { cb(401); return; }
- const headers = {Authorization: `Bearer ${state.access_token}`};
- const url = `/_matrix/client/v3/rooms/${state.config.room_id}/send/m.room.message/${state.req_counter}`;
- state.req_counter++;
- updatePersist(state); // req_counter changed
+ const headers = {Authorization: `Bearer ${gstate.access_token}`};
+ const url = `/_matrix/client/v3/rooms/${gstate.config.room_id}/send/m.room.message/${gstate.req_counter}`;
+ gstate.req_counter++;
+ updatePersist(); // req_counter changed
const data = JSON.stringify({
msgtype: "m.text",
body: text,
});
- fetch("PUT", headers, state.matrix_server, url, data, (status, body) => {
+ fetch("PUT", headers, gstate.matrix_server, url, data, (status, body) => {
cb(status, body);
});
}
@@ -146,21 +201,21 @@ function logFailure(message, cb) {
// Tries to send a message, trying login if the access token is invalid.
// cb: (success: bool) => ()
-function matrixSendMsgLogin(state, text, cb) {
- matrixSendMsg(state, text, (status, body) => {
+function matrixSendMsgLogin(text, cb) {
+ matrixSendMsg(text, (status, body) => {
switch (status) {
case 200:
cb(true);
break;
case 401:
- matrixLogin(state, (success, body) => {
+ matrixLogin((success, body) => {
if (!success) {
logFailure(`Failed to log in: ${body}`, () => cb(false));
return;
}
- matrixSendMsg(state, text, (status, body) => {
+ matrixSendMsg(text, (status, body) => {
switch (status) {
case 200: cb(true); return;
case 401: logFailure(`401 even after login: ${body}`, () => cb(false)); break;
@@ -183,14 +238,27 @@ module.exports = function(app, io, _moddir) {
const config = require("./matrix.json");
const accounts = require("./accounts.json");
+ const ratelimit = new PRatelimit(1000);
+
augmentConfig(config, state => {
+ gstate = state;
+
app.post("/statusbot", bodyParser.json(), cmn.authgen(accounts), (req, res) => {
if (typeof req.body.sender != "string" || typeof req.body.text != "string") {
return res.sendStatus(400);
}
- matrixSendMsgLogin(state, `[${req.body.sender}] ${req.body.text}`, success => {
- if (success) res.sendStatus(200);
- else res.sendStatus(503); // service unavailable
+ ratelimit.submit(rlcb => {
+ ptimeout(5000,
+ cb => matrixSendMsgLogin(`[${req.body.sender}] ${req.body.text}`, cb),
+ (finished, success) => {
+ if (!finished) {
+ res.sendStatus(504); // gateway timeout
+ logFailure(`Timed out on message: [${req.body.sender}] ${req.body.text}`, () => {});
+ } else if (!success) res.sendStatus(503); // service unavailable
+ else res.sendStatus(200);
+ rlcb();
+ }
+ );
});
});
});