From 37091fd7cf98ca7c14d6b4122f6b74ad7079d7b0 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Sun, 12 Aug 2018 11:32:46 +0200 Subject: Working version --- .gitignore | 2 + inter.py | 369 +++++++++++++++++++++++++++++++++++++++++++++++++++---------- main.py | 37 ++++--- pa.py | 104 +++++++++++++++++ pacmd.py | 52 +++++---- 5 files changed, 470 insertions(+), 94 deletions(-) create mode 100644 pa.py diff --git a/.gitignore b/.gitignore index c18dd8d..27cfc34 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ __pycache__/ +termio.py +libtermio.so diff --git a/inter.py b/inter.py index 4a4cd5b..c316663 100644 --- a/inter.py +++ b/inter.py @@ -1,56 +1,313 @@ -from collections import namedtuple -import pacmd - -Sink = namedtuple("Sink", - ["name", # string - "description", # string - "index", # int - "state", # string - "muted", # bool - "volume"]) # string -SinkInput = namedtuple("SinkInput", - ["name", # string - "driver", # string - "sink"]) # int - -def list_sinks(): - res = pacmd.list_sinks() - assert len(res.sections) == 1 - assert res.sections[0].name == "sink" - - ret = [] - for item in res.sections[0].items: - muted = item.children["muted"].value - if muted == "yes": muted = True - elif muted == "no": muted = False - else: assert False - - sink = Sink( - item.children["name"].value, - item.children["properties"].children["device.description"].value, - item.index, - item.children["state"].value, - muted, - item.children["volume"].value) - ret.append(sink) - - return ret - -def list_sink_inputs(): - res = pacmd.list_sink_inputs() - assert len(res.sections) == 1 - assert res.sections[0].name == "input" - - ret = [] - for item in res.sections[0].items: - name = item.children["properties"].children["media.name"].value - if "application.process.binary" in item.children["properties"].children: - name += " (" + item.children["properties"].children["application.process.binary"].value + ")" - - si = SinkInput( - name, - item.children["driver"].value, - int(item.children["sink"].value.split()[0])) - ret.append(si) - - return ret +import sys, os, atexit +import pacmd, pa +try: + sys.path.append(os.path.dirname(os.path.realpath(__file__))) + import termio as T +except Exception as e: + print("Place termio.py and libtermio.so from the github.com/tomsmeding/termio project in this directory") + print(e) + sys.exit(1) + +__all__ = ["start", "end", "mainloop", "update"] + +inps = None +sinks = None +sel = (0, 0) +prompt_y = 0 # updated by redraw() + +MENU_MAIN, MENU_VOLUME, NUM_MENUS = range(3) +menu = MENU_MAIN + +menutext = [0] * NUM_MENUS +menuopts = [0] * NUM_MENUS + +menutext[MENU_MAIN] = "" +menuopts[MENU_MAIN] = [ + ("(q)uit", [0,1]), + ("(v)olume", [0,1]), + ("(d)efault", [1]), + ("(m)ove", [0]) +] + +menutext[MENU_VOLUME] = "Change volume: <- / -> (alt: 1%) (m)ute [q/esc: return]" +menuopts[MENU_VOLUME] = [] + + +def start(): + T.initscreen() + atexit.register(T.endscreen) + T.initkeyboard(False) + atexit.register(T.endkeyboard) + + redraw() + +def end(): + T.endkeyboard() + atexit.unregister(T.endkeyboard) + T.endscreen() + atexit.unregister(T.endscreen) + +def mainloop(): + global sel, menu + while True: + update() + if len([inps, sinks][sel[0]]) == 0 and len([sinks, inps][sel[0]]) != 0: + sel = (1 - sel[0], sel[1]) + redraw() + + key = T.tgetkey() + + if menu != MENU_MAIN and (key == T.KEY_ESC or key == ord('q')): + menu = MENU_MAIN + + elif menu == MENU_MAIN: + if key == T.KEY_DOWN: + if sel[0] == 0 and sel[1] >= len(inps) - 1: + if len(sinks) > 0: + sel = (1, 0) + elif sel[0] == 1 and sel[1] >= len(sinks) - 1: + pass + else: + sel = (sel[0], sel[1] + 1) + + elif key == T.KEY_UP: + if sel[0] == 0 and sel[1] == 0: + pass + elif sel[0] == 1 and sel[1] == 0: + if len(inps) > 0: + sel = (0, len(inps) - 1) + else: + sel = (sel[0], sel[1] - 1) + + elif key == ord('q'): + break + + elif key == ord('v'): + thing = get_selected() + kind = "sink" if type(thing) == pa.Sink else "input" + vol = thing.volume() + if vol[0] != vol[1]: + show_message( + "Warning: current volume of this " + kind + " is asymmetric!") + + menu = MENU_VOLUME + + elif sel[0] == 1 and key == ord('d'): + wrap_pacmd(lambda: sinks[sel[1]].set_default()) + + elif sel[0] == 0 and key == ord('m'): + idx = show_prompt( + "Enter sink number to link input {} to" + .format(inps[sel[1]].index())) + + if idx is None: + continue + + try: + idx = int(idx) + except: + show_message("Invalid number!") + continue + + for i in range(len(sinks)): + if sinks[i].index() == idx: + wrap_pacmd(inps[sel[1]].move_to_sink(idx)) + break + else: + show_message("No sink found with that index!") + + elif menu == MENU_VOLUME: + thing = [inps, sinks][sel[0]][sel[1]] + + incr = 0 + if key == T.KEY_RIGHT: + incr = 0.05 + elif key == T.KEY_LEFT: + incr = -0.05 + elif key == T.KEY_ALT + T.KEY_RIGHT: + incr = 0.01 + elif key == T.KEY_ALT + T.KEY_LEFT: + incr = -0.01 + elif key == ord('m'): + thing.set_muted(not thing.muted()) + continue + else: + T.bel() + continue + + vol = sum(thing.volume()) / 2 + vol = min(1, max(0, vol + incr)) + wrap_pacmd(lambda: thing.set_volume(vol)) + + else: + assert False + +def get_selected(): + return [inps, sinks][sel[0]][sel[1]] + +def wrap_pacmd(lam): + try: + lam() + except pacmd.PacmdError as e: + show_message("An error occurred:\n" + str(e)) + +def show_message(msg): + sz = T.gettermsize() + T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ') + T.moveto(0, prompt_y) + T.tprint("! " + msg + "\n[press return]") + T.redraw() + + while True: + key = T.tgetkey() + if key == T.KEY_CR or key == T.KEY_LF: + break + + T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ') + +def show_prompt(msg): + sz = T.gettermsize() + T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ') + T.moveto(0, prompt_y) + T.tprint(msg + "\n> ") + T.redraw() + + line = T.tgetline() + + T.fillrect(0, prompt_y, sz.w, sz.h - prompt_y, ' ') + return line + +def redraw(): + global prompt_y + if inps is None: + update() + + sz = T.gettermsize() + T.fillrect(0, 0, sz.w, sz.h, ' ') + + T.setstyle(T.Style(9, 9, False, False)) + + y = 0 + T.moveto(0, y) + T.setbold(True) + T.tprint("Sink Inputs") + T.setbold(False) + y += 1 + + for i, inp in enumerate(inps): + T.moveto(4, y) + print_input(inp, sel[0] == 0 and sel[1] == i) + y += 1 + + y += 1 + T.moveto(0, y) + T.setbold(True) + T.tprint("Sinks") + T.setbold(False) + y += 1 + + for i, sink in enumerate(sinks): + T.moveto(4, y) + print_sink(sink, sel[0] == 1 and sel[1] == i) + y += 1 + + y += 2 + + prompt_y = y + + T.moveto(0, y) + T.tprint(menutext[menu]) + first = True + for (text, groups) in menuopts[menu]: + if sel[0] in groups: + if first: first = False + else: T.tprint(" ") + T.tprint(text) + # if sel[0] not in groups: + # T.setfg(6) + # T.tprint(text) + # T.setfg(9) + + y += 1 + + T.moveto(0, y) + T.tprint("> ") + + T.redraw() + +def print_prefix(thing, selected): + if selected: + T.setbold(True) + T.tprint("> ") + T.setbold(False) + else: + T.tprint(" ") + +def fmt_volume(vol): + res = ["", ""] + for i in range(2): + res[i] = str(round(100 * vol[i])) + "%" + if vol[0] == vol[1]: + return res[0] + else: + return res[0] + "/" + res[1] + +def print_volume(thing): + T.setfg(6) + T.tprint("(vol: {}{})".format( + fmt_volume(thing.volume()), + " (MUTED)" if thing.muted() else "")) + T.setfg(9) + +def print_input(inp, selected): + print_prefix(inp, selected) + + T.setfg(3) + T.tprint("[{}]".format(inp.index())) + T.setfg(9) + + T.tprint(" ") + T.tprint(inp.name()) + + T.tprint(" ") + print_volume(inp) + + T.setfg(3) + T.tprint(" -> ") + T.tprint("[{}]".format(inp.sink())) + T.setfg(9) + +def print_sink(sink, selected): + print_prefix(sink, selected) + + T.setfg(3) + T.tprint("[{}]".format(sink.index())) + T.setfg(9) + + T.tprint(" ") + T.tprint(sink.name()) + + T.tprint(" ") + print_volume(sink) + + if sink.default(): + T.tprint(" ") + T.setbold(True) + T.tprint("[default]") + T.setbold(False) + +def update(): + global inps, sinks + + inps = pa.list_sink_inputs() + sinks = pa.list_sinks() + redraw() + + +orig_excepthook = sys.excepthook +def excepthook(exctype, value, traceback): + T.endkeyboard() + T.endscreen() + orig_excepthook(exctype, value, traceback) + +sys.excepthook = excepthook diff --git a/main.py b/main.py index 8be5f9c..8fea1d2 100755 --- a/main.py +++ b/main.py @@ -1,24 +1,27 @@ #!/usr/bin/env python3 -import pacmd, inter +import pacmd, pa, inter -def low_query(func): - res = func() - print("Infos:") - for info in res.infos: - print(info) - print("Sections:") - for sect in res.sections: - pacmd.dump_section(sect) - -def high_query(func): - res = func() - for x in res: - print(x) +# def low_query(func): +# res = func() +# print("Infos:") +# for info in res.infos: +# print(info) +# print("Sections:") +# for sect in res.sections: +# pacmd.dump_section(sect) +# def high_query(func): +# res = func() +# for x in res: +# print(x) # low_query(pacmd.list_sinks) -high_query(inter.list_sinks) -print() -high_query(inter.list_sink_inputs) +# high_query(pa.list_sinks) +# print() +# high_query(pa.list_sink_inputs) + +inter.start() +inter.mainloop() +inter.end() diff --git a/pa.py b/pa.py new file mode 100644 index 0000000..f8a2674 --- /dev/null +++ b/pa.py @@ -0,0 +1,104 @@ +from collections import namedtuple +import pacmd + +class Sink: + def __init__(self, pitem): + assert type(pitem) == pacmd.Item + self._i = pitem + + def name(self): + return self._i.ch["name"].value + + def description(self): + return self._i.ch["properties"].ch["device.description"].value + + def default(self): + return self._i.default + + def index(self): + return self._i.index + + def state(self): + return self._i.ch["state"].value + + def muted(self): + return _parse_muted(self._i) + + def volume(self): + return _parse_volume(self._i) + + def set_volume(self, vol): + assert type(vol) == float or type(vol) == int + vol = round(vol * _get_maxvol(self._i)) + pacmd.pacmd("set-sink-volume", str(self.index()), str(vol)) + + def set_default(self): + pacmd.pacmd("set-default-sink", str(self.index())) + +class SinkInput: + def __init__(self, pitem): + assert type(pitem) == pacmd.Item + self._i = pitem + + def name(self): + name = self._i.ch["properties"].ch["media.name"].value + if "application.process.binary" in self._i.ch["properties"].ch: + name += " (" + self._i.ch["properties"].ch["application.process.binary"].value + ")" + return name + + def default(self): + return self._i.default + + def index(self): + return self._i.index + + def driver(self): + return self._i.ch["driver"].value + + def sink(self): + return int(self._i.ch["sink"].value.split()[0]) + + def muted(self): + return _parse_muted(self._i) + + def volume(self): + return _parse_volume(self._i) + + def set_volume(self, vol): + assert type(vol) == float or type(vol) == int + vol = round(vol * _get_maxvol(self._i)) + pacmd.pacmd("set-sink-input-volume", str(self.index()), str(vol)) + + def move_to_sink(self, idx): + assert type(idx) == int + pacmd.pacmd("move-sink-input", str(self.index()), str(idx)) + + +def _get_maxvol(item): + if "base volume" in item.ch: + return int(item.ch["base volume"].value.split("/")[0].strip()) + else: + return 65536 + +def _parse_volume(item): + vol = [int(x.split(":")[1].split("/")[0].strip()) + for x in item.ch["volume"].value[0].split(",")] + maxvol = _get_maxvol(item) + return [v / maxvol for v in vol] + +def _parse_muted(item): + if item.ch["muted"].value == "yes": return True + elif item.ch["muted"].value == "no": return False + else: assert False + +def list_sinks(): + res = pacmd.list_sinks() + assert len(res.sections) == 1 + assert res.sections[0].name == "sink" + return [Sink(item) for item in res.sections[0].items] + +def list_sink_inputs(): + res = pacmd.list_sink_inputs() + assert len(res.sections) == 1 + assert res.sections[0].name == "input" + return [SinkInput(item) for item in res.sections[0].items] diff --git a/pacmd.py b/pacmd.py index 42d3ce8..c1071f6 100644 --- a/pacmd.py +++ b/pacmd.py @@ -1,26 +1,27 @@ import subprocess, re # These are not namedtuples because we want them mutable -class PacmdSection: +class Section: def __init__(self, name, items): self.name = name # string - self.items = items # [PacmdItem] + self.items = items # [Item] -class PacmdItem: - def __init__(self, index, children): +class Item: + def __init__(self, index, default, ch): self.index = index # int - self.children = children # {key: PacmdNode} + self.default = default # bool + self.ch = ch # {key: Node} -class PacmdNode: - def __init__(self, value, children): +class Node: + def __init__(self, value, ch): self.value = value # None | string | [string] - self.children = children # {key: PacmdNode} + self.ch = ch # {key: Node} class Pacmd: def __init__(self, full_output): lines = full_output.decode("utf-8").split("\n") self.infos = [] # [string] - self.sections = [] # [PacmdSection] + self.sections = [] # [Section] currsect = None curritem = None @@ -56,7 +57,7 @@ class Pacmd: line = line.replace("\t", " ") m = re.match(r"^( *(\* )?)([^ ].*)", line) - is_default = m.group(2) is not None # TODO use is_default + is_default = m.group(2) is not None indent = len(m.group(1)) line = m.group(3) @@ -68,7 +69,7 @@ class Pacmd: close_currsect() name = re.search(r" ([^ ]*)\(s\)", line).group(1) - currsect = PacmdSection(name, []) + currsect = Section(name, []) else: self.infos.append(line) @@ -76,7 +77,7 @@ class Pacmd: close_curritem() index = re.search(r"index: ([0-9]*)$", line).group(1) - curritem = PacmdItem(int(index), {}) + curritem = Item(int(index), is_default, {}) elif indent >= 8: assert curritem != None @@ -89,16 +90,16 @@ class Pacmd: parentval, thisval = None, curritem for k in currpath: - parentval, thisval = thisval, thisval.children[k] + parentval, thisval = thisval, thisval.ch[k] if q_is_key_value(line): key, value = q_get_key_value(line) - thisval.children[key] = PacmdNode(value, {}) + thisval.ch[key] = Node(value, {}) currpath.append(key) elif re.search(r":$", line) is not None: key = line[:-1] - thisval.children[key] = PacmdNode(None, {}) + thisval.ch[key] = Node(None, {}) currpath.append(key) elif indent_depth >= len(currpath): @@ -116,17 +117,26 @@ class Pacmd: close_currsect() +class PacmdError(Exception): + pass + def pacmd(*args): - return Pacmd(subprocess.check_output(["pacmd"] + list(args))) + try: + return subprocess.check_output(["pacmd"] + list(args), stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise PacmdError(e.output) + +def _pacmd_parsed(*args): + return Pacmd(pacmd(*args)) def list_all(): - return pacmd("list") + return _pacmd_parsed("list") def list_sinks(): - return pacmd("list-sinks") + return _pacmd_parsed("list-sinks") def list_sink_inputs(): - return pacmd("list-sink-inputs") + return _pacmd_parsed("list-sink-inputs") def _make_indent(n): @@ -136,11 +146,11 @@ def dump_section(sect): print("Section " + sect.name + ":") for item in sect.items: print(_make_indent(1) + "Item " + str(item.index) + ":") - for key, node in item.children.items(): + for key, node in item.ch.items(): dump_node(node, key, 2); def dump_node(node, key, indent): print(_make_indent(indent) + key + ":" + ("" if node.value is None else " " + str(node.value))) - for k2, n2 in node.children.items(): + for k2, n2 in node.ch.items(): dump_node(n2, k2, indent + 1) -- cgit v1.2.3