aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2020-06-06 13:41:09 +0200
committerTom Smeding <tom.smeding@gmail.com>2020-06-06 13:41:09 +0200
commit0b5fe0a11afd6003d4f4adfaf462392cfdba5e89 (patch)
tree0cb64bb194111e509e4e7e1f0511f0bc662b3ffa
parent6f5974d2e670b854e6d75febeb489178cbcd4c15 (diff)
elusive-events script
-rwxr-xr-xelusive-events.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/elusive-events.py b/elusive-events.py
new file mode 100755
index 0000000..ea8a7b5
--- /dev/null
+++ b/elusive-events.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python3
+
+import re, sys, time
+import pacmd, pa
+
+
+DEFAULT_PRINT = ["properties/media.name"]
+
+config = {
+ "delay": 0.1, # type: float
+ "print": [], # type: List[str]
+ "printall": False, # type: bool
+ "filter": [], # type: List[Tuple[str, str]]
+ "filterregex": [], # type: List[Tuple[str, str]]
+ "do": [], # type: List[Tuple[str, Any]]
+}
+
+
+def usage():
+ print(\
+"""Usage: {argv0} [OPTIONS]
+
+Program for watching and modifying elusive sound events.
+
+By default, this program will regularly poll PulseAudio (using pacmd) for
+sink-inputs, and print their details on standard output. You can change the
+information printed, filter events, and perform certain actions on matching
+events.
+
+Options are as follows:
+
+ -config option=value
+ Configuration options are as follows:
+ delay: Seconds to sleep between polling for sink-inputs. Default: {config[delay]}
+
+ -print key
+ Prints the given key in the log. If this option is not used at all, by
+ default the following keys are printed:
+ {default_print}
+ The available keys can be scouted by running 'pacmd list-sink-inputs'
+ while a sound is playing, or by using -printall.
+
+ -printall
+ Print all keys in the log. This is pretty verbose.
+
+ -filter key=value
+ Process only events for which 'key' exists and matches 'value'; if the
+ key does not exists in an event, it is assumed not to match.
+
+ -filter key~=regex
+ Same as 'filter key=value', except that 'regex' is a Python regex.
+
+ -do volume=float
+ Sets the volume of the event to the given value between 0.0 and 1.0. The
+ maximum volume is heuristically determined.
+
+ -do raw_volume=int
+ Sets the volume of the event to the given value; this is typically 0 to
+ 65536, but may be different."""
+ .format(argv0=sys.argv[0], config=config, default_print=DEFAULT_PRINT),
+ file=sys.stderr)
+
+def parse_args(args):
+ global config
+
+ i = 0
+
+ def report_error(fmtstr):
+ nonlocal i, args
+ print(fmtstr.format(arg=args[i], prev=args[i-1]), file=sys.stderr)
+ sys.exit(1)
+
+ def next_arg():
+ nonlocal i, args
+ if i + 1 < len(args):
+ i += 1
+ return args[i]
+ else:
+ report_error("Expected argument after '{arg}'")
+
+ def parse_kv(arg):
+ nonlocal i, args
+ m = re.fullmatch(r"([^=]*)=(.*)", arg)
+ if m:
+ return m[1], m[2]
+ else:
+ report_error("Expected 'key=value' argument after '{prev}'")
+
+ def parse_int(value):
+ try:
+ return int(value, 10)
+ except e:
+ report_error("Expected integer value in '{arg}'")
+
+ def parse_float(value):
+ try:
+ return float(value)
+ except e:
+ report_error("Expected float value in '{arg}'")
+
+ while i < len(args):
+ if args[i] in ["-h", "--help", "-help"]:
+ usage()
+ sys.exit(0)
+
+ elif args[i] == "-config":
+ key, value = parse_kv(next_arg())
+ if key == "delay":
+ config["delay"] = parse_float(value)
+ else:
+ report_error("Unknown config key in '{arg}'")
+
+ elif args[i] == "-print":
+ config["print"].append(next_arg())
+
+ elif args[i] == "-printall":
+ config["printall"] = True
+
+ elif args[i] == "-filter":
+ key, value = parse_kv(next_arg())
+ if key[-1] == "~":
+ config["filterregex"].append((key[:-1], value))
+ else:
+ config["filter"].append((key, value))
+
+ elif args[i] == "-do":
+ key, value = parse_kv(next_arg())
+ if key == "volume":
+ value = parse_float(value)
+ if value < 0 or value > 1:
+ report_error("Volume out of range [0.0, 1.0]: {arg}")
+ config["do"].append(("volume", parse_float(value)))
+ elif key == "raw_volume":
+ config["do"].append(("raw_volume", parse_int(value)))
+ else:
+ report_error("Unknown action key in '{arg}'")
+
+ else:
+ report_error("Unrecognised argument '{arg}'")
+
+ i += 1
+
+ if len(config["print"]) == 0:
+ config["print"] = DEFAULT_PRINT
+
+def get_key(inp, key):
+ key = key.split("/")
+ node = inp.properties()
+ for item in key:
+ if item not in node.ch: return None
+ node = node.ch[item]
+ return " ".join(node.value) if type(node.value) == list else node.value
+
+def all_keys(inp):
+ def all_keys_of_node(node, prefix):
+ res = []
+ if type(node) == pacmd.Node and node.value is not None:
+ res.append(prefix[1:])
+ for k, n in node.ch.items(): res += all_keys_of_node(n, prefix + "/" + k)
+ return res
+ return all_keys_of_node(inp.properties(), "")
+
+def filters_allow(inp):
+ for (key, wanted) in config["filter"]:
+ value = get_key(inp, key)
+ if value is None or value != wanted:
+ return False
+ for (key, regex) in config["filterregex"]:
+ value = get_key(inp, key)
+ if value is None or not re.fullmatch(regex, value):
+ return False
+ return True
+
+def format_event(inp):
+ to_print = config["print"] if not config["printall"] else all_keys(inp)
+ line = str(inp.index()) + ":"
+ for key in to_print:
+ value = get_key(inp, key)
+ line += " {}={}".format(key, "<none>" if value is None else repr(value))
+ return line
+
+def perform_event_actions(inp):
+ for action, arg in config["do"]:
+ if action == "volume":
+ assert type(arg) == float and 0 <= arg <= 1
+ inp.set_volume(arg)
+ print(" Set volume to {} on {}".format(arg, inp.index()))
+ elif action == "raw_volume":
+ assert type(arg) == int
+ inp.set_raw_volume(arg)
+ print(" Set raw volume to {} on {}".format(arg, inp.index()))
+ else:
+ assert False
+
+class State:
+ def __init__(self):
+ self.ongoing = set()
+
+ def poll(self):
+ newev = {inp.index(): inp for inp in pa.list_sink_inputs()}
+
+ # Remove events that are not ongoing anymore
+ self.ongoing &= newev.keys()
+
+ # Ignore ongoing events
+ for k in self.ongoing:
+ if k in newev:
+ newev.pop(k)
+
+ # New events are now also ongoing
+ self.ongoing |= newev.keys()
+
+ # Ignore any filtered-out events
+ filtered_out = [index for index, inp in newev.items()
+ if not filters_allow(inp)]
+ for k in filtered_out:
+ newev.pop(k)
+
+ for inp in newev.values():
+ print(format_event(inp))
+
+ for inp in newev.values():
+ perform_event_actions(inp)
+
+def main():
+ parse_args(sys.argv[1:])
+ # print(config)
+
+ state = State()
+ try:
+ while True:
+ state.poll()
+ time.sleep(config["delay"])
+ except KeyboardInterrupt:
+ sys.exit(0)
+
+if __name__ == "__main__":
+ main()