aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2018-08-12 11:32:46 +0200
committerTom Smeding <tom.smeding@gmail.com>2018-08-12 11:32:46 +0200
commit37091fd7cf98ca7c14d6b4122f6b74ad7079d7b0 (patch)
treec862c534f2356924f0312e119b52dc7645707101
parentcd3aaa9e9fb1cd661e76638f7cb78e47a88e8cde (diff)
Working version
-rw-r--r--.gitignore2
-rw-r--r--inter.py369
-rwxr-xr-xmain.py37
-rw-r--r--pa.py104
-rw-r--r--pacmd.py52
5 files changed, 470 insertions, 94 deletions
diff --git a/.gitignore b/.gitignore
index c18dd8d..27cfc34 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
__pycache__/
+termio.py
+libtermio.so
diff --git a/inter.py b/inter.py
index 4a4cd5b..c316663 100644
--- a/inter.py
+++ b/inter.py
@@ -1,56 +1,313 @@
-from collections import namedtuple
-import pacmd
-
-Sink = namedtuple("Sink",
- ["name", # string
- "description", # string
- "index", # int
- "state", # string
- "muted", # bool
- "volume"]) # string
-SinkInput = namedtuple("SinkInput",
- ["name", # string
- "driver", # string
- "sink"]) # int
-
-def list_sinks():
- res = pacmd.list_sinks()
- assert len(res.sections) == 1
- assert res.sections[0].name == "sink"
-
- ret = []
- for item in res.sections[0].items:
- muted = item.children["muted"].value
- if muted == "yes": muted = True
- elif muted == "no": muted = False
- else: assert False
-
- sink = Sink(
- item.children["name"].value,
- item.children["properties"].children["device.description"].value,
- item.index,
- item.children["state"].value,
- muted,
- item.children["volume"].value)
- ret.append(sink)
-
- return ret
-
-def list_sink_inputs():
- res = pacmd.list_sink_inputs()
- assert len(res.sections) == 1
- assert res.sections[0].name == "input"
-
- ret = []
- for item in res.sections[0].items:
- name = item.children["properties"].children["media.name"].value
- if "application.process.binary" in item.children["properties"].children:
- name += " (" + item.children["properties"].children["application.process.binary"].value + ")"
-
- si = SinkInput(
- name,
- item.children["driver"].value,
- int(item.children["sink"].value.split()[0]))
- ret.append(si)
-
- return ret
+import sys, os, atexit
+import pacmd, pa
+try:
+ sys.path.append(os.path.dirname(os.path.realpath(__file__)))
+ import termio as T
+except Exception as e:
+ print("Place termio.py and libtermio.so from the github.com/tomsmeding/termio project in this directory")
+ print(e)
+ sys.exit(1)
+
+__all__ = ["start", "end", "mainloop", "update"]
+
+inps = None
+sinks = None
+sel = (0, 0)
+prompt_y = 0 # updated by redraw()
+
+MENU_MAIN, MENU_VOLUME, NUM_MENUS = range(3)
+menu = MENU_MAIN
+
+menutext = [0] * NUM_MENUS
+menuopts = [0] * NUM_MENUS
+
+menutext[MENU_MAIN] = ""
+menuopts[MENU_MAIN] = [
+ ("(q)uit", [0,1]),
+ ("(v)olume", [0,1]),
+ ("(d)efault", [1]),
+ ("(m)ove", [0])
+]
+
+menutext[MENU_VOLUME] = "Change volume: <- / -> (alt: 1%) (m)ute [q/esc: return]"
+menuopts[MENU_VOLUME] = []
+
+
+def start():
+ T.initscreen()
+ atexit.register(T.endscreen)
+ T.initkeyboard(False)
+ atexit.register(T.endkeyboard)
+
+ redraw()
+
+def end():
+ T.endkeyboard()
+ atexit.unregister(T.endkeyboard)
+ T.endscreen()
+ atexit.unregister(T.endscreen)
+
+def mainloop():
+ global sel, menu
+ while True:
+ update()
+ if len([inps, sinks][sel[0]]) == 0 and len([sinks, inps][sel[0]]) != 0:
+ sel = (1 - sel[0], sel[1])
+ redraw()
+
+ key = T.tgetkey()
+
+ if menu != MENU_MAIN and (key == T.KEY_ESC or key == ord('q')):
+ menu = MENU_MAIN
+
+ elif menu == MENU_MAIN:
+ if key == T.KEY_DOWN:
+ if sel[0] == 0 and sel[1] >= len(inps) - 1:
+ if len(sinks) > 0:
+ sel = (1, 0)
+ elif sel[0] == 1 and sel[1] >= len(sinks) - 1:
+ pass
+ else:
+ sel = (sel[0], sel[1] + 1)
+
+ elif key == T.KEY_UP:
+ if sel[0] == 0 and sel[1] == 0:
+ pass
+ elif sel[0] == 1 and sel[1] == 0:
+ if len(inps) > 0:
+ sel = (0, len(inps) - 1)
+ else:
+ sel = (sel[0], sel[1] - 1)
+
+ elif key == ord('q'):
+ break
+
+ elif key == ord('v'):
+ thing = get_selected()
+ kind = "sink" if type(thing) == pa.Sink else "input"
+ vol = thing.volume()
+ if vol[0] != vol[1]:
+ show_message(
+ "Warning: current volume of this " + kind + " is asymmetric!")
+
+ menu = MENU_VOLUME
+
+ elif sel[0] == 1 and key == ord('d'):
+ wrap_pacmd(lambda: sinks[sel[1]].set_default())
+
+ elif sel[0] == 0 and key == ord('m'):
+ idx = show_prompt(
+ "Enter sink number to link input {} to"
+ .format(inps[sel[1]].index()))
+
+ if idx is None:
+ continue
+
+ try:
+ idx = int(idx)
+ except:
+ show_message("Invalid number!")
+ continue
+
+ for i in range(len(sinks)):
+ if sinks[i].index() == idx:
+ wrap_pacmd(inps[sel[1]].move_to_sink(idx))
+ break
+ else:
+ show_message("No sink found with that index!")
+
+ elif menu == MENU_VOLUME:
+ thing = [inps, sinks][sel[0]][sel[1]]
+
+ incr = 0
+ if key == T.KEY_RIGHT:
+ incr = 0.05
+ elif key == T.KEY_LEFT:
+ incr = -0.05
+ elif key == T.KEY_ALT + T.KEY_RIGHT:
+ incr = 0.01
+ elif key == T.KEY_ALT + T.KEY_LEFT:
+ incr = -0.01
+ elif key == ord('m'):
+ thing.set_muted(not thing.muted())
+ continue
+ else:
+ T.bel()
+ continue
+
+ vol = sum(thing.volume()) / 2
+ vol = min(1, max(0, vol + incr))
+ wrap_pacmd(lambda: thing.set_volume(vol))
+
+ else:
+ assert False
+
+def get_selected():
+ return [inps, sinks][sel[0]][sel[1]]
+
+def wrap_pacmd(lam):
+ try:
+ lam()
+ except pacmd.PacmdError as e:
+ show_message("An error occurred:\n" + str(e))
+
+def show_message(msg):
+ sz = T.gettermsize()
+ T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ')
+ T.moveto(0, prompt_y)
+ T.tprint("! " + msg + "\n[press return]")
+ T.redraw()
+
+ while True:
+ key = T.tgetkey()
+ if key == T.KEY_CR or key == T.KEY_LF:
+ break
+
+ T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ')
+
+def show_prompt(msg):
+ sz = T.gettermsize()
+ T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ')
+ T.moveto(0, prompt_y)
+ T.tprint(msg + "\n> ")
+ T.redraw()
+
+ line = T.tgetline()
+
+ T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ')
+ return line
+
+def redraw():
+ global prompt_y
+ if inps is None:
+ update()
+
+ sz = T.gettermsize()
+ T.fillrect(0, 0, sz.w, sz.h, ' ')
+
+ T.setstyle(T.Style(9, 9, False, False))
+
+ y = 0
+ T.moveto(0, y)
+ T.setbold(True)
+ T.tprint("Sink Inputs")
+ T.setbold(False)
+ y += 1
+
+ for i, inp in enumerate(inps):
+ T.moveto(4, y)
+ print_input(inp, sel[0] == 0 and sel[1] == i)
+ y += 1
+
+ y += 1
+ T.moveto(0, y)
+ T.setbold(True)
+ T.tprint("Sinks")
+ T.setbold(False)
+ y += 1
+
+ for i, sink in enumerate(sinks):
+ T.moveto(4, y)
+ print_sink(sink, sel[0] == 1 and sel[1] == i)
+ y += 1
+
+ y += 2
+
+ prompt_y = y
+
+ T.moveto(0, y)
+ T.tprint(menutext[menu])
+ first = True
+ for (text, groups) in menuopts[menu]:
+ if sel[0] in groups:
+ if first: first = False
+ else: T.tprint(" ")
+ T.tprint(text)
+ # if sel[0] not in groups:
+ # T.setfg(6)
+ # T.tprint(text)
+ # T.setfg(9)
+
+ y += 1
+
+ T.moveto(0, y)
+ T.tprint("> ")
+
+ T.redraw()
+
+def print_prefix(thing, selected):
+ if selected:
+ T.setbold(True)
+ T.tprint("> ")
+ T.setbold(False)
+ else:
+ T.tprint(" ")
+
+def fmt_volume(vol):
+ res = ["", ""]
+ for i in range(2):
+ res[i] = str(round(100 * vol[i])) + "%"
+ if vol[0] == vol[1]:
+ return res[0]
+ else:
+ return res[0] + "/" + res[1]
+
+def print_volume(thing):
+ T.setfg(6)
+ T.tprint("(vol: {}{})".format(
+ fmt_volume(thing.volume()),
+ " (MUTED)" if thing.muted() else ""))
+ T.setfg(9)
+
+def print_input(inp, selected):
+ print_prefix(inp, selected)
+
+ T.setfg(3)
+ T.tprint("[{}]".format(inp.index()))
+ T.setfg(9)
+
+ T.tprint(" ")
+ T.tprint(inp.name())
+
+ T.tprint(" ")
+ print_volume(inp)
+
+ T.setfg(3)
+ T.tprint(" -> ")
+ T.tprint("[{}]".format(inp.sink()))
+ T.setfg(9)
+
+def print_sink(sink, selected):
+ print_prefix(sink, selected)
+
+ T.setfg(3)
+ T.tprint("[{}]".format(sink.index()))
+ T.setfg(9)
+
+ T.tprint(" ")
+ T.tprint(sink.name())
+
+ T.tprint(" ")
+ print_volume(sink)
+
+ if sink.default():
+ T.tprint(" ")
+ T.setbold(True)
+ T.tprint("[default]")
+ T.setbold(False)
+
+def update():
+ global inps, sinks
+
+ inps = pa.list_sink_inputs()
+ sinks = pa.list_sinks()
+ redraw()
+
+
+orig_excepthook = sys.excepthook
+def excepthook(exctype, value, traceback):
+ T.endkeyboard()
+ T.endscreen()
+ orig_excepthook(exctype, value, traceback)
+
+sys.excepthook = excepthook
diff --git a/main.py b/main.py
index 8be5f9c..8fea1d2 100755
--- a/main.py
+++ b/main.py
@@ -1,24 +1,27 @@
#!/usr/bin/env python3
-import pacmd, inter
+import pacmd, pa, inter
-def low_query(func):
- res = func()
- print("Infos:")
- for info in res.infos:
- print(info)
- print("Sections:")
- for sect in res.sections:
- pacmd.dump_section(sect)
-
-def high_query(func):
- res = func()
- for x in res:
- print(x)
+# def low_query(func):
+# res = func()
+# print("Infos:")
+# for info in res.infos:
+# print(info)
+# print("Sections:")
+# for sect in res.sections:
+# pacmd.dump_section(sect)
+# def high_query(func):
+# res = func()
+# for x in res:
+# print(x)
# low_query(pacmd.list_sinks)
-high_query(inter.list_sinks)
-print()
-high_query(inter.list_sink_inputs)
+# high_query(pa.list_sinks)
+# print()
+# high_query(pa.list_sink_inputs)
+
+inter.start()
+inter.mainloop()
+inter.end()
diff --git a/pa.py b/pa.py
new file mode 100644
index 0000000..f8a2674
--- /dev/null
+++ b/pa.py
@@ -0,0 +1,104 @@
+from collections import namedtuple
+import pacmd
+
+class Sink:
+ def __init__(self, pitem):
+ assert type(pitem) == pacmd.Item
+ self._i = pitem
+
+ def name(self):
+ return self._i.ch["name"].value
+
+ def description(self):
+ return self._i.ch["properties"].ch["device.description"].value
+
+ def default(self):
+ return self._i.default
+
+ def index(self):
+ return self._i.index
+
+ def state(self):
+ return self._i.ch["state"].value
+
+ def muted(self):
+ return _parse_muted(self._i)
+
+ def volume(self):
+ return _parse_volume(self._i)
+
+ def set_volume(self, vol):
+ assert type(vol) == float or type(vol) == int
+ vol = round(vol * _get_maxvol(self._i))
+ pacmd.pacmd("set-sink-volume", str(self.index()), str(vol))
+
+ def set_default(self):
+ pacmd.pacmd("set-default-sink", str(self.index()))
+
+class SinkInput:
+ def __init__(self, pitem):
+ assert type(pitem) == pacmd.Item
+ self._i = pitem
+
+ def name(self):
+ name = self._i.ch["properties"].ch["media.name"].value
+ if "application.process.binary" in self._i.ch["properties"].ch:
+ name += " (" + self._i.ch["properties"].ch["application.process.binary"].value + ")"
+ return name
+
+ def default(self):
+ return self._i.default
+
+ def index(self):
+ return self._i.index
+
+ def driver(self):
+ return self._i.ch["driver"].value
+
+ def sink(self):
+ return int(self._i.ch["sink"].value.split()[0])
+
+ def muted(self):
+ return _parse_muted(self._i)
+
+ def volume(self):
+ return _parse_volume(self._i)
+
+ def set_volume(self, vol):
+ assert type(vol) == float or type(vol) == int
+ vol = round(vol * _get_maxvol(self._i))
+ pacmd.pacmd("set-sink-input-volume", str(self.index()), str(vol))
+
+ def move_to_sink(self, idx):
+ assert type(idx) == int
+ pacmd.pacmd("move-sink-input", str(self.index()), str(idx))
+
+
+def _get_maxvol(item):
+ if "base volume" in item.ch:
+ return int(item.ch["base volume"].value.split("/")[0].strip())
+ else:
+ return 65536
+
+def _parse_volume(item):
+ vol = [int(x.split(":")[1].split("/")[0].strip())
+ for x in item.ch["volume"].value[0].split(",")]
+ maxvol = _get_maxvol(item)
+ return [v / maxvol for v in vol]
+
+def _parse_muted(item):
+ if item.ch["muted"].value == "yes": return True
+ elif item.ch["muted"].value == "no": return False
+ else: assert False
+
+def list_sinks():
+ res = pacmd.list_sinks()
+ assert len(res.sections) == 1
+ assert res.sections[0].name == "sink"
+ return [Sink(item) for item in res.sections[0].items]
+
+def list_sink_inputs():
+ res = pacmd.list_sink_inputs()
+ assert len(res.sections) == 1
+ assert res.sections[0].name == "input"
+ return [SinkInput(item) for item in res.sections[0].items]
diff --git a/pacmd.py b/pacmd.py
index 42d3ce8..c1071f6 100644
--- a/pacmd.py
+++ b/pacmd.py
@@ -1,26 +1,27 @@
import subprocess, re
# These are not namedtuples because we want them mutable
-class PacmdSection:
+class Section:
def __init__(self, name, items):
self.name = name # string
- self.items = items # [PacmdItem]
+ self.items = items # [Item]
-class PacmdItem:
- def __init__(self, index, children):
+class Item:
+ def __init__(self, index, default, ch):
self.index = index # int
- self.children = children # {key: PacmdNode}
+ self.default = default # bool
+ self.ch = ch # {key: Node}
-class PacmdNode:
- def __init__(self, value, children):
+class Node:
+ def __init__(self, value, ch):
self.value = value # None | string | [string]
- self.children = children # {key: PacmdNode}
+ self.ch = ch # {key: Node}
class Pacmd:
def __init__(self, full_output):
lines = full_output.decode("utf-8").split("\n")
self.infos = [] # [string]
- self.sections = [] # [PacmdSection]
+ self.sections = [] # [Section]
currsect = None
curritem = None
@@ -56,7 +57,7 @@ class Pacmd:
line = line.replace("\t", " ")
m = re.match(r"^( *(\* )?)([^ ].*)", line)
- is_default = m.group(2) is not None # TODO use is_default
+ is_default = m.group(2) is not None
indent = len(m.group(1))
line = m.group(3)
@@ -68,7 +69,7 @@ class Pacmd:
close_currsect()
name = re.search(r" ([^ ]*)\(s\)", line).group(1)
- currsect = PacmdSection(name, [])
+ currsect = Section(name, [])
else:
self.infos.append(line)
@@ -76,7 +77,7 @@ class Pacmd:
close_curritem()
index = re.search(r"index: ([0-9]*)$", line).group(1)
- curritem = PacmdItem(int(index), {})
+ curritem = Item(int(index), is_default, {})
elif indent >= 8:
assert curritem != None
@@ -89,16 +90,16 @@ class Pacmd:
parentval, thisval = None, curritem
for k in currpath:
- parentval, thisval = thisval, thisval.children[k]
+ parentval, thisval = thisval, thisval.ch[k]
if q_is_key_value(line):
key, value = q_get_key_value(line)
- thisval.children[key] = PacmdNode(value, {})
+ thisval.ch[key] = Node(value, {})
currpath.append(key)
elif re.search(r":$", line) is not None:
key = line[:-1]
- thisval.children[key] = PacmdNode(None, {})
+ thisval.ch[key] = Node(None, {})
currpath.append(key)
elif indent_depth >= len(currpath):
@@ -116,17 +117,26 @@ class Pacmd:
close_currsect()
+class PacmdError(Exception):
+ pass
+
def pacmd(*args):
- return Pacmd(subprocess.check_output(["pacmd"] + list(args)))
+ try:
+ return subprocess.check_output(["pacmd"] + list(args), stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ raise PacmdError(e.output)
+
+def _pacmd_parsed(*args):
+ return Pacmd(pacmd(*args))
def list_all():
- return pacmd("list")
+ return _pacmd_parsed("list")
def list_sinks():
- return pacmd("list-sinks")
+ return _pacmd_parsed("list-sinks")
def list_sink_inputs():
- return pacmd("list-sink-inputs")
+ return _pacmd_parsed("list-sink-inputs")
def _make_indent(n):
@@ -136,11 +146,11 @@ def dump_section(sect):
print("Section " + sect.name + ":")
for item in sect.items:
print(_make_indent(1) + "Item " + str(item.index) + ":")
- for key, node in item.children.items():
+ for key, node in item.ch.items():
dump_node(node, key, 2);
def dump_node(node, key, indent):
print(_make_indent(indent) + key + ":" +
("" if node.value is None else " " + str(node.value)))
- for k2, n2 in node.children.items():
+ for k2, n2 in node.ch.items():
dump_node(n2, k2, indent + 1)