From 0b5fe0a11afd6003d4f4adfaf462392cfdba5e89 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Sat, 6 Jun 2020 13:41:09 +0200 Subject: elusive-events script --- elusive-events.py | 238 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100755 elusive-events.py diff --git a/elusive-events.py b/elusive-events.py new file mode 100755 index 0000000..ea8a7b5 --- /dev/null +++ b/elusive-events.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 + +import re, sys, time +import pacmd, pa + + +DEFAULT_PRINT = ["properties/media.name"] + +config = { + "delay": 0.1, # type: float + "print": [], # type: List[str] + "printall": False, # type: bool + "filter": [], # type: List[Tuple[str, str]] + "filterregex": [], # type: List[Tuple[str, str]] + "do": [], # type: List[Tuple[str, Any]] +} + + +def usage(): + print(\ +"""Usage: {argv0} [OPTIONS] + +Program for watching and modifying elusive sound events. + +By default, this program will regularly poll PulseAudio (using pacmd) for +sink-inputs, and print their details on standard output. You can change the +information printed, filter events, and perform certain actions on matching +events. + +Options are as follows: + + -config option=value + Configuration options are as follows: + delay: Seconds to sleep between polling for sink-inputs. Default: {config[delay]} + + -print key + Prints the given key in the log. If this option is not used at all, by + default the following keys are printed: + {default_print} + The available keys can be scouted by running 'pacmd list-sink-inputs' + while a sound is playing, or by using -printall. + + -printall + Print all keys in the log. This is pretty verbose. + + -filter key=value + Process only events for which 'key' exists and matches 'value'; if the + key does not exists in an event, it is assumed not to match. + + -filter key~=regex + Same as 'filter key=value', except that 'regex' is a Python regex. + + -do volume=float + Sets the volume of the event to the given value between 0.0 and 1.0. The + maximum volume is heuristically determined. + + -do raw_volume=int + Sets the volume of the event to the given value; this is typically 0 to + 65536, but may be different.""" + .format(argv0=sys.argv[0], config=config, default_print=DEFAULT_PRINT), + file=sys.stderr) + +def parse_args(args): + global config + + i = 0 + + def report_error(fmtstr): + nonlocal i, args + print(fmtstr.format(arg=args[i], prev=args[i-1]), file=sys.stderr) + sys.exit(1) + + def next_arg(): + nonlocal i, args + if i + 1 < len(args): + i += 1 + return args[i] + else: + report_error("Expected argument after '{arg}'") + + def parse_kv(arg): + nonlocal i, args + m = re.fullmatch(r"([^=]*)=(.*)", arg) + if m: + return m[1], m[2] + else: + report_error("Expected 'key=value' argument after '{prev}'") + + def parse_int(value): + try: + return int(value, 10) + except e: + report_error("Expected integer value in '{arg}'") + + def parse_float(value): + try: + return float(value) + except e: + report_error("Expected float value in '{arg}'") + + while i < len(args): + if args[i] in ["-h", "--help", "-help"]: + usage() + sys.exit(0) + + elif args[i] == "-config": + key, value = parse_kv(next_arg()) + if key == "delay": + config["delay"] = parse_float(value) + else: + report_error("Unknown config key in '{arg}'") + + elif args[i] == "-print": + config["print"].append(next_arg()) + + elif args[i] == "-printall": + config["printall"] = True + + elif args[i] == "-filter": + key, value = parse_kv(next_arg()) + if key[-1] == "~": + config["filterregex"].append((key[:-1], value)) + else: + config["filter"].append((key, value)) + + elif args[i] == "-do": + key, value = parse_kv(next_arg()) + if key == "volume": + value = parse_float(value) + if value < 0 or value > 1: + report_error("Volume out of range [0.0, 1.0]: {arg}") + config["do"].append(("volume", parse_float(value))) + elif key == "raw_volume": + config["do"].append(("raw_volume", parse_int(value))) + else: + report_error("Unknown action key in '{arg}'") + + else: + report_error("Unrecognised argument '{arg}'") + + i += 1 + + if len(config["print"]) == 0: + config["print"] = DEFAULT_PRINT + +def get_key(inp, key): + key = key.split("/") + node = inp.properties() + for item in key: + if item not in node.ch: return None + node = node.ch[item] + return " ".join(node.value) if type(node.value) == list else node.value + +def all_keys(inp): + def all_keys_of_node(node, prefix): + res = [] + if type(node) == pacmd.Node and node.value is not None: + res.append(prefix[1:]) + for k, n in node.ch.items(): res += all_keys_of_node(n, prefix + "/" + k) + return res + return all_keys_of_node(inp.properties(), "") + +def filters_allow(inp): + for (key, wanted) in config["filter"]: + value = get_key(inp, key) + if value is None or value != wanted: + return False + for (key, regex) in config["filterregex"]: + value = get_key(inp, key) + if value is None or not re.fullmatch(regex, value): + return False + return True + +def format_event(inp): + to_print = config["print"] if not config["printall"] else all_keys(inp) + line = str(inp.index()) + ":" + for key in to_print: + value = get_key(inp, key) + line += " {}={}".format(key, "" if value is None else repr(value)) + return line + +def perform_event_actions(inp): + for action, arg in config["do"]: + if action == "volume": + assert type(arg) == float and 0 <= arg <= 1 + inp.set_volume(arg) + print(" Set volume to {} on {}".format(arg, inp.index())) + elif action == "raw_volume": + assert type(arg) == int + inp.set_raw_volume(arg) + print(" Set raw volume to {} on {}".format(arg, inp.index())) + else: + assert False + +class State: + def __init__(self): + self.ongoing = set() + + def poll(self): + newev = {inp.index(): inp for inp in pa.list_sink_inputs()} + + # Remove events that are not ongoing anymore + self.ongoing &= newev.keys() + + # Ignore ongoing events + for k in self.ongoing: + if k in newev: + newev.pop(k) + + # New events are now also ongoing + self.ongoing |= newev.keys() + + # Ignore any filtered-out events + filtered_out = [index for index, inp in newev.items() + if not filters_allow(inp)] + for k in filtered_out: + newev.pop(k) + + for inp in newev.values(): + print(format_event(inp)) + + for inp in newev.values(): + perform_event_actions(inp) + +def main(): + parse_args(sys.argv[1:]) + # print(config) + + state = State() + try: + while True: + state.poll() + time.sleep(config["delay"]) + except KeyboardInterrupt: + sys.exit(0) + +if __name__ == "__main__": + main() -- cgit v1.2.3