From 5364d92c1c57d67daa09abea5f76810df6dbd911 Mon Sep 17 00:00:00 2001 From: Julian Daube Date: Tue, 22 Oct 2019 16:05:25 +0200 Subject: [PATCH] gui: first functional application --- gui/app.py | 322 ++++++++++++++++++++++++++++++++++++++++++++ gui/serial_ports.py | 35 +++++ 2 files changed, 357 insertions(+) create mode 100755 gui/app.py create mode 100644 gui/serial_ports.py diff --git a/gui/app.py b/gui/app.py new file mode 100755 index 0000000..6738b23 --- /dev/null +++ b/gui/app.py @@ -0,0 +1,322 @@ +#!/bin/python +import serial +import wx +import threading +import serial_ports + +class AudioMuxConnection(threading.Thread): + def __init__(self, port=None): + super().__init__() + + self.transport = None + self.buffer = "" + self.running = 0 + + if port: + self.open(port) + + def open(self, portname): + self.transport = serial.Serial(portname, timeout=1) + self.buffer = "" + self.onConnected() + + def close(self): + if self.transport: + self.stop() + self.transport.close() + self.transport = None + + def write(self, msg): + if self.transport: + self.transport.write((str(msg) + "\n").encode("ASCII")) + + + def run(self): + self.running = 1 + self.write("?") # query current state + + while self.running: + self.onData(self.transport.read_all()) + + self.transport.close() + self.running = 0 + + def stop(self): + if self.running: + self.running = 0 + self.join() + + + def onData(self, data): + """ Decodes all messages that are received from the audiomux """ + self.buffer += data.decode("ASCII") + + if "\n" in self.buffer: + lines = self.buffer.split("\n") + + self.buffer = lines[-1] + for line in lines[:-1]: + self.onMessage(line) + + def onMessage(self, msg): + """ The callback for a new message """ + pass + + def onConnected(self): + """ The Callback for a successfull connection """ + pass + + +class ConnectionPanel(wx.Panel): + def __init__(self, parent, onAccepted=None): + super().__init__(parent) + self.choice = wx.Choice(self, choices=serial_ports.list_all()) + + self.button = wx.Button(self, label="Connect") + self.button.Bind(wx.EVT_BUTTON, self.onSelected) + + sizer = wx.BoxSizer(wx.HORIZONTAL) + sizer.Add(self.choice, wx.EXPAND) + sizer.Add(self.button) + + self.SetSizerAndFit(sizer) + self.onAccepted = onAccepted + + def onSelected(self, evt): + self.value = self.choice.GetSelection() + + if self.value and self.onAccepted: + self.onAccepted(self.value) + + @property + def value(self): + if hasattr(self, "_value") and self._value != wx.NOT_FOUND: + return self.choice.GetString(self._value) + + return None + + @value.setter + def value(self, v): + self._value = v + + def Success(self): + self.choice.Enable(0) + self.button.SetLabel("Disconnect") + + def Clear(self): + self.choice.Clear() + self.choice.AppendItems(serial_ports.list_all()) + self.choice.Enable(1) + self.button.SetLabel("Connect") + +class InputButtons(wx.Panel): + def __init__(self, parent, onChange=None): + super().__init__(parent) + sizer = wx.BoxSizer(wx.HORIZONTAL) + self.onChange = onChange + self.buttons = {} + + for i in range(6): + self.buttons[i] = wx.ToggleButton(self, size=(30,30), label="{}".format(i+1), id = i) + self.buttons[i].Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.onClick(evt.GetId())) + sizer.Add(self.buttons[i], wx.EXPAND | wx.SHAPED) + + self.SetSizer(sizer) + + def SetActive(self, num): + for i in self.buttons: + if num == i: + self.buttons[i].SetValue(1) + else: + self.buttons[i].SetValue(0) + def SetAll(self): + for button in self.buttons.values(): + button.SetValue(1) + + def onClick(self, num): + self.SetActive(num) + if self.onChange: + self.onChange(num) + +class MainPanel(wx.Panel): + def __init__(self, parent): + super().__init__(parent) + self.parent = parent + + sizer = wx.BoxSizer(wx.VERTICAL) + + sizer.Add(self.Inputs(), 0, wx.EXPAND) + sizer.Add(wx.StaticLine(self), 0, wx.EXPAND) + sizer.Add(self.Sliders(), 1, wx.EXPAND) + sizer.Add(wx.StaticLine(self), 0, wx.EXPAND) + sizer.Add(self.Buttons()) + + self.SetSizer(sizer) + + def Inputs(self): + self.inputs = InputButtons(self, lambda num: self.reqInputChange(num)) + return self.inputs + + def Sliders(self): + self.gain = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=20, size=(30,100)) + self.latt = wx.Slider(self, style=wx.SL_VERTICAL , minValue=0, maxValue=90, size=(30,100)) + self.ratt = wx.Slider(self, style=wx.SL_VERTICAL , minValue=0, maxValue=90, size=(30,100)) + self.bboost = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=6, size=(30,100)) + self.tboost = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=6, size=(30,100)) + + self.SliderEvent(self.gain, "G") + self.SliderEvent(self.latt, "L") + self.SliderEvent(self.ratt, "R") + self.SliderEvent(self.bboost, "B") + self.SliderEvent(self.tboost, "T") + + sizer = wx.BoxSizer(wx.HORIZONTAL) + + self.SliderLabel(sizer, self.gain, "Gain") + self.SliderLabel(sizer, self.latt, "L") + self.SliderLabel(sizer, self.ratt, "R") + self.SliderLabel(sizer, self.bboost, "Bass Boost") + self.SliderLabel(sizer, self.tboost, "Treble Boost") + + sizer.AddStretchSpacer() + + return sizer + + def SliderEvent(self, slider, command): + slider.Bind(wx.EVT_SLIDER, lambda evt: self.parent.write("{}{}".format(command, evt.GetInt()))) + + + def SliderLabel(self, sizer, slider, label): + sizer.AddStretchSpacer() + temp = wx.BoxSizer(wx.VERTICAL) + temp.Add(slider, 1, wx.EXPAND|wx.ALL) + temp.Add(wx.StaticText(self, label=label, style=wx.ALIGN_CENTRE), 0, wx.ALL) + sizer.Add(temp, 0, wx.EXPAND | wx.ALL) + + def Buttons(self): + self.muteBtn = wx.ToggleButton(self, label="Mute") + self.muteBtn.Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.parent.write("M{}".format(int(evt.IsChecked())))) + self.shortBtn = wx.ToggleButton(self, label="Short") + self.shortBtn.Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.parent.write("S{}".format(int(evt.IsChecked())))) + + sizer = wx.BoxSizer(wx.HORIZONTAL) + sizer.Add(self.muteBtn) + sizer.Add(self.shortBtn) + sizer.AddStretchSpacer() + + return sizer + + def reqInputChange(self, num): + self.parent.write("C{}".format(num)) + + def setInput(self, num): + self.inputs.SetActive(num) + + def setGain(self, num): + self.gain.setValue(num) + + def setTreble(self, num): + self.tboost.SetValue(num) + + def setBass(self, num): + self.bboost.SetValue(num) + + def setLeftAtt(self, num): + self.latt.SetValue(num) + + def setRightAtt(self, num): + self.ratt.SetValue(num) + + def mute(self, m): + self.muteBtn.SetValue(m) + self.shortBtn.Enable(not m) + self.inputs.Enable(not m) + + def short(self, m): + self.shortBtn.SetValue(m) + self.muteBtn.Enable(not m) + self.inputs.Enable(not m) + + if m: + self.inputs.SetAll() + +class Frame(wx.Frame, AudioMuxConnection): + def __init__(self): + # init parents + wx.Frame.__init__(self,None, title="AudioMux Control") + AudioMuxConnection.__init__(self) + + sizer = wx.BoxSizer(wx.VERTICAL) + sizer.Add(self.newConnectionPanel(), 0, wx.EXPAND) + sizer.Add(wx.StaticLine(self), 0, wx.EXPAND) + sizer.Add(self.newMainPanel(), 1, wx.EXPAND) + + self.SetSizerAndFit(sizer) + + def newMainPanel(self): + self.mainPanel = MainPanel(self) + self.mainPanel.Enable(0) + + return self.mainPanel + + def newConnectionPanel(self): + self.connectionPanel = ConnectionPanel(self, lambda choice: self.onConnectionRequest(choice)) + return self.connectionPanel + + def onConnectionRequest(self, choice): + if self.running: + # stop the connection and update connection dialog + self.close() + self.connectionPanel.Clear() + self.mainPanel.Enable(0) + else: + # try to start the serial connection + AudioMuxConnection.__init__(self, choice) + + def onConnected(self): + # serial connection established + self.connectionPanel.Success() + self.start() + self.mainPanel.Enable(1) + + def onMessage(self, msg): + msg = msg.strip("\r\n ") + + if msg[0] == 'C': + num = int(msg.strip("C")) + self.mainPanel.setInput(num) + elif msg[0] == 'G': + num = int(msg.strip("G")) + self.mainPanel.setGain(num) + elif msg[0] == 'T': + num = int(msg.strip("T")) + self.mainPanel.setTreble(num) + elif msg[0] == 'B': + num = int(msg.strip("B")) + self.mainPanel.setBass(num) + elif msg[0] == 'M': + num = int(msg.strip("M")) + self.mainPanel.mute(num) + elif msg[0] == 'S': + num = int(msg.strip("S")) + self.mainPanel.short(num) + elif msg[0] == 'L': + num = int(msg.strip("L")) + self.mainPanel.setLeftAtt(num) + elif msg[0] == 'R': + num = int(msg.strip("R")) + self.mainPanel.setRightAtt(num) + + +if __name__ == "__main__": + app = wx.App() + frame = Frame() + + try: + frame.Show() + app.MainLoop() + finally: + frame.close() + + + diff --git a/gui/serial_ports.py b/gui/serial_ports.py new file mode 100644 index 0000000..4dd56d9 --- /dev/null +++ b/gui/serial_ports.py @@ -0,0 +1,35 @@ +import sys +import serial +import glob + +def list_all(): + """ Lists serial port names + + :raises EnvironmentError: + On unsupported or unknown platforms + :returns: + A list of the serial ports available on the system + """ + if sys.platform.startswith('win'): + ports = ['COM%s' % (i + 1) for i in range(256)] + elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): + # this excludes your current terminal "/dev/tty" + ports = glob.glob('/dev/tty[a-zA-Z]*') + elif sys.platform.startswith('darwin'): + ports = glob.glob('/dev/tty.*') + else: + raise EnvironmentError('Unsupported platform') + + result = [] + for port in ports: + try: + s = serial.Serial(port) + s.close() + result.append(port) + except (OSError, serial.SerialException): + pass + return result + + +if __name__ == "__main__": + print(list_all()) \ No newline at end of file