From e6f3ba4a683d5720408deb8e3df2c3fecf1ff367 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Fri, 10 Aug 2018 23:39:23 +0200 Subject: Initial --- .gitignore | 1 + README.txt | 1 + inter.py | 47 ++++++++++++++++++++ main.py | 21 +++++++++ pacmd.py | 146 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+) create mode 100644 .gitignore create mode 100644 README.txt create mode 100644 inter.py create mode 100755 main.py create mode 100644 pacmd.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..120c264 --- /dev/null +++ b/README.txt @@ -0,0 +1 @@ +A manager for PulseAudio sinks and sink-inputs. diff --git a/inter.py b/inter.py new file mode 100644 index 0000000..109950d --- /dev/null +++ b/inter.py @@ -0,0 +1,47 @@ +from collections import namedtuple +import pacmd + +Sink = namedtuple("Sink", ["name", "description", "index", "state", "muted"]) +SinkInput = namedtuple("SinkInput", ["name", "driver", "sink"]) + +def list_sinks(): + res = pacmd.list_sinks() + assert len(res.sections) == 1 + assert res.sections[0].name == "sink" + + ret = [] + for item in res.sections[0].items: + muted = item.children["muted"].value + if type(muted) == str: + if muted == "yes": muted = True + elif muted == "no": muted = False + else: assert False + + sink = Sink( + item.children["name"].value, + item.children["properties"].children["device.description"].value, + item.index, + item.children["state"].value, + muted) + ret.append(sink) + + return ret + +def list_sink_inputs(): + res = pacmd.list_sink_inputs() + assert len(res.sections) == 1 + assert res.sections[0].name == "input" + + ret = [] + for item in res.sections[0].items: + name = item.children["properties"].children["media.name"].value + if "application.process.binary" in item.children["properties"].children: + name += " (" + item.children["properties"].children["application.process.binary"].value + ")" + + si = SinkInput( + name, + item.children["driver"].value, + int(item.children["sink"].value.split()[0])) + ret.append(si) + + return ret diff --git a/main.py b/main.py new file mode 100755 index 0000000..92da31d --- /dev/null +++ b/main.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +import pacmd, inter + +# res = pacmd.list_sink_inputs() +# print("Infos:") +# for info in res.infos: +# print(info) +# print("Sections:") +# for sect in res.sections: +# pacmd.dump_section(sect) + +res = inter.list_sinks() +for sink in res: + print(sink) + +print() + +res = inter.list_sink_inputs() +for si in res: + print(si) diff --git a/pacmd.py b/pacmd.py new file mode 100644 index 0000000..e9e019e --- /dev/null +++ b/pacmd.py @@ -0,0 +1,146 @@ +import subprocess, re +from collections import namedtuple + +class PacmdSection: + def __init__(self, name, items): + self.name = name # string + self.items = items # [PacmdItem] + +class PacmdItem: + def __init__(self, index, children): + self.index = index # int + self.children = children # {key: PacmdNode} + +class PacmdNode: + def __init__(self, value, children): + self.value = value # None | string | [string] + self.children = children # {key: PacmdNode} + +class Pacmd: + def __init__(self, full_output): + lines = full_output.decode("utf-8").split("\n") + self.infos = [] # [string] + self.sections = [] # [PacmdSection] + + currsect = None + curritem = None + currpath = [] + + def close_curritem(): + nonlocal curritem, currsect, currpath + if curritem: + currsect.items.append(curritem) + curritem = None + currpath = [] + + def close_currsect(): + nonlocal self, currsect + if currsect: + close_curritem() + self.sections.append(currsect) + + def q_is_key_value(ln): + return ln.find("=") != -1 or ln.find(": ") != -1 + + def q_get_key_value(ln): + if ln.find("=") != -1: + m = re.search(r"^\s*([^=]*?)\s*=\s*(.*)$", ln) + else: + m = re.search(r"^\s*([^:]*):\s*(.*)$", ln) + return m.group(1), m.group(2) + + for line in lines: + line = line.rstrip() + if len(line) == 0: + continue + + line = line.replace("\t", " ") + m = re.match(r"^( *(\* )?)([^ ].*)", line) + is_default = m.group(2) is not None # TODO use is_default + indent = len(m.group(1)) + line = m.group(3) + + # print(currpath) + # print(indent, line) + + if indent == 0: + if line[0].isdigit(): + close_currsect() + + name = re.search(r" ([^ ]*)\(s\)", line).group(1) + currsect = PacmdSection(name, []) + else: + self.infos.append(line) + + elif indent == 4: + close_curritem() + + index = re.search(r"index: ([0-9]*)$", line).group(1) + curritem = PacmdItem(int(index), {}) + + elif indent >= 8: + assert curritem != None + + is_aligned = indent % 8 == 0 + indent_depth = (indent + 7) // 8 - 1 + + if is_aligned and indent_depth < len(currpath): + currpath = currpath[:indent_depth] + + parentval, thisval = None, curritem + for k in currpath: + parentval, thisval = thisval, thisval.children[k] + + if q_is_key_value(line): + key, value = q_get_key_value(line) + thisval.children[key] = PacmdNode(value, {}) + currpath.append(key) + + elif re.search(r":$", line) is not None: + key = line[:-1] + thisval.children[key] = PacmdNode(None, {}) + currpath.append(key) + + elif indent_depth >= len(currpath): + if type(thisval.value) != list: + thisval.value = [thisval.value] + thisval.value.append(line) + + else: + print("line = " + line) + assert False + + else: + print("indent = " + str(indent)) + assert False + + close_currsect() + +def pacmd(*args): + return Pacmd(subprocess.check_output(["pacmd"] + list(args))) + +def list_all(): + return pacmd("list") + +def list_sinks(): + return pacmd("list-sinks") + +def list_sink_inputs(): + return pacmd("list-sink-inputs") + + +def _make_indent(n): + return " " * n + +def dump_section(sect): + print("Section " + sect.name + ":") + for item in sect.items: + print(_make_indent(1) + "Item " + str(item.index) + ":") + for key, node in item.children.items(): + dump_node(node, key, 2); + +def dump_node(node, key, indent): + print(_make_indent(indent) + key + ":" + + ("" if node.value is None else " " + str(node.value))) + for k2, n2 in node.children.items(): + dump_node(n2, k2, indent + 1) -- cgit v1.2.3