aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2018-08-10 23:39:23 +0200
committerTom Smeding <tom.smeding@gmail.com>2018-08-10 23:39:23 +0200
commite6f3ba4a683d5720408deb8e3df2c3fecf1ff367 (patch)
tree25783d3857343f91a3c37257e5a24e15bb6b7342
Initial
-rw-r--r--.gitignore1
-rw-r--r--README.txt1
-rw-r--r--inter.py47
-rwxr-xr-xmain.py21
-rw-r--r--pacmd.py146
5 files changed, 216 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c18dd8d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__/
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..120c264
--- /dev/null
+++ b/README.txt
@@ -0,0 +1 @@
+A manager for PulseAudio sinks and sink-inputs.
diff --git a/inter.py b/inter.py
new file mode 100644
index 0000000..109950d
--- /dev/null
+++ b/inter.py
@@ -0,0 +1,47 @@
+from collections import namedtuple
+import pacmd
+
+Sink = namedtuple("Sink", ["name", "description", "index", "state", "muted"])
+SinkInput = namedtuple("SinkInput", ["name", "driver", "sink"])
+
+def list_sinks():
+ res = pacmd.list_sinks()
+ assert len(res.sections) == 1
+ assert res.sections[0].name == "sink"
+
+ ret = []
+ for item in res.sections[0].items:
+ muted = item.children["muted"].value
+ if type(muted) == str:
+ if muted == "yes": muted = True
+ elif muted == "no": muted = False
+ else: assert False
+
+ sink = Sink(
+ item.children["name"].value,
+ item.children["properties"].children["device.description"].value,
+ item.index,
+ item.children["state"].value,
+ muted)
+ ret.append(sink)
+
+ return ret
+
+def list_sink_inputs():
+ res = pacmd.list_sink_inputs()
+ assert len(res.sections) == 1
+ assert res.sections[0].name == "input"
+
+ ret = []
+ for item in res.sections[0].items:
+ name = item.children["properties"].children["media.name"].value
+ if "application.process.binary" in item.children["properties"].children:
+ name += " (" + item.children["properties"].children["application.process.binary"].value + ")"
+
+ si = SinkInput(
+ name,
+ item.children["driver"].value,
+ int(item.children["sink"].value.split()[0]))
+ ret.append(si)
+
+ return ret
diff --git a/main.py b/main.py
new file mode 100755
index 0000000..92da31d
--- /dev/null
+++ b/main.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python3
+
+import pacmd, inter
+
+# res = pacmd.list_sink_inputs()
+# print("Infos:")
+# for info in res.infos:
+# print(info)
+# print("Sections:")
+# for sect in res.sections:
+# pacmd.dump_section(sect)
+
+res = inter.list_sinks()
+for sink in res:
+ print(sink)
+
+print()
+
+res = inter.list_sink_inputs()
+for si in res:
+ print(si)
diff --git a/pacmd.py b/pacmd.py
new file mode 100644
index 0000000..e9e019e
--- /dev/null
+++ b/pacmd.py
@@ -0,0 +1,146 @@
+import subprocess, re
+from collections import namedtuple
+
+class PacmdSection:
+ def __init__(self, name, items):
+ self.name = name # string
+ self.items = items # [PacmdItem]
+
+class PacmdItem:
+ def __init__(self, index, children):
+ self.index = index # int
+ self.children = children # {key: PacmdNode}
+
+class PacmdNode:
+ def __init__(self, value, children):
+ self.value = value # None | string | [string]
+ self.children = children # {key: PacmdNode}
+
+class Pacmd:
+ def __init__(self, full_output):
+ lines = full_output.decode("utf-8").split("\n")
+ self.infos = [] # [string]
+ self.sections = [] # [PacmdSection]
+
+ currsect = None
+ curritem = None
+ currpath = []
+
+ def close_curritem():
+ nonlocal curritem, currsect, currpath
+ if curritem:
+ currsect.items.append(curritem)
+ curritem = None
+ currpath = []
+
+ def close_currsect():
+ nonlocal self, currsect
+ if currsect:
+ close_curritem()
+ self.sections.append(currsect)
+
+ def q_is_key_value(ln):
+ return ln.find("=") != -1 or ln.find(": ") != -1
+
+ def q_get_key_value(ln):
+ if ln.find("=") != -1:
+ m = re.search(r"^\s*([^=]*?)\s*=\s*(.*)$", ln)
+ else:
+ m = re.search(r"^\s*([^:]*):\s*(.*)$", ln)
+ return m.group(1), m.group(2)
+
+ for line in lines:
+ line = line.rstrip()
+ if len(line) == 0:
+ continue
+
+ line = line.replace("\t", " ")
+ m = re.match(r"^( *(\* )?)([^ ].*)", line)
+ is_default = m.group(2) is not None # TODO use is_default
+ indent = len(m.group(1))
+ line = m.group(3)
+
+ # print(currpath)
+ # print(indent, line)
+
+ if indent == 0:
+ if line[0].isdigit():
+ close_currsect()
+
+ name = re.search(r" ([^ ]*)\(s\)", line).group(1)
+ currsect = PacmdSection(name, [])
+ else:
+ self.infos.append(line)
+
+ elif indent == 4:
+ close_curritem()
+
+ index = re.search(r"index: ([0-9]*)$", line).group(1)
+ curritem = PacmdItem(int(index), {})
+
+ elif indent >= 8:
+ assert curritem != None
+
+ is_aligned = indent % 8 == 0
+ indent_depth = (indent + 7) // 8 - 1
+
+ if is_aligned and indent_depth < len(currpath):
+ currpath = currpath[:indent_depth]
+
+ parentval, thisval = None, curritem
+ for k in currpath:
+ parentval, thisval = thisval, thisval.children[k]
+
+ if q_is_key_value(line):
+ key, value = q_get_key_value(line)
+ thisval.children[key] = PacmdNode(value, {})
+ currpath.append(key)
+
+ elif re.search(r":$", line) is not None:
+ key = line[:-1]
+ thisval.children[key] = PacmdNode(None, {})
+ currpath.append(key)
+
+ elif indent_depth >= len(currpath):
+ if type(thisval.value) != list:
+ thisval.value = [thisval.value]
+ thisval.value.append(line)
+
+ else:
+ print("line = " + line)
+ assert False
+
+ else:
+ print("indent = " + str(indent))
+ assert False
+
+ close_currsect()
+
+def pacmd(*args):
+ return Pacmd(subprocess.check_output(["pacmd"] + list(args)))
+
+def list_all():
+ return pacmd("list")
+
+def list_sinks():
+ return pacmd("list-sinks")
+
+def list_sink_inputs():
+ return pacmd("list-sink-inputs")
+
+
+def _make_indent(n):
+ return " " * n
+
+def dump_section(sect):
+ print("Section " + sect.name + ":")
+ for item in sect.items:
+ print(_make_indent(1) + "Item " + str(item.index) + ":")
+ for key, node in item.children.items():
+ dump_node(node, key, 2);
+
+def dump_node(node, key, indent):
+ print(_make_indent(indent) + key + ":" +
+ ("" if node.value is None else " " + str(node.value)))
+ for k2, n2 in node.children.items():
+ dump_node(n2, k2, indent + 1)