gui: first functional application
This commit is contained in:
parent
5934b7f2d4
commit
5364d92c1c
322
gui/app.py
Executable file
322
gui/app.py
Executable file
@ -0,0 +1,322 @@
|
||||
#!/bin/python
|
||||
import serial
|
||||
import wx
|
||||
import threading
|
||||
import serial_ports
|
||||
|
||||
class AudioMuxConnection(threading.Thread):
|
||||
def __init__(self, port=None):
|
||||
super().__init__()
|
||||
|
||||
self.transport = None
|
||||
self.buffer = ""
|
||||
self.running = 0
|
||||
|
||||
if port:
|
||||
self.open(port)
|
||||
|
||||
def open(self, portname):
|
||||
self.transport = serial.Serial(portname, timeout=1)
|
||||
self.buffer = ""
|
||||
self.onConnected()
|
||||
|
||||
def close(self):
|
||||
if self.transport:
|
||||
self.stop()
|
||||
self.transport.close()
|
||||
self.transport = None
|
||||
|
||||
def write(self, msg):
|
||||
if self.transport:
|
||||
self.transport.write((str(msg) + "\n").encode("ASCII"))
|
||||
|
||||
|
||||
def run(self):
|
||||
self.running = 1
|
||||
self.write("?") # query current state
|
||||
|
||||
while self.running:
|
||||
self.onData(self.transport.read_all())
|
||||
|
||||
self.transport.close()
|
||||
self.running = 0
|
||||
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.running = 0
|
||||
self.join()
|
||||
|
||||
|
||||
def onData(self, data):
|
||||
""" Decodes all messages that are received from the audiomux """
|
||||
self.buffer += data.decode("ASCII")
|
||||
|
||||
if "\n" in self.buffer:
|
||||
lines = self.buffer.split("\n")
|
||||
|
||||
self.buffer = lines[-1]
|
||||
for line in lines[:-1]:
|
||||
self.onMessage(line)
|
||||
|
||||
def onMessage(self, msg):
|
||||
""" The callback for a new message """
|
||||
pass
|
||||
|
||||
def onConnected(self):
|
||||
""" The Callback for a successfull connection """
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionPanel(wx.Panel):
|
||||
def __init__(self, parent, onAccepted=None):
|
||||
super().__init__(parent)
|
||||
self.choice = wx.Choice(self, choices=serial_ports.list_all())
|
||||
|
||||
self.button = wx.Button(self, label="Connect")
|
||||
self.button.Bind(wx.EVT_BUTTON, self.onSelected)
|
||||
|
||||
sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||
sizer.Add(self.choice, wx.EXPAND)
|
||||
sizer.Add(self.button)
|
||||
|
||||
self.SetSizerAndFit(sizer)
|
||||
self.onAccepted = onAccepted
|
||||
|
||||
def onSelected(self, evt):
|
||||
self.value = self.choice.GetSelection()
|
||||
|
||||
if self.value and self.onAccepted:
|
||||
self.onAccepted(self.value)
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
if hasattr(self, "_value") and self._value != wx.NOT_FOUND:
|
||||
return self.choice.GetString(self._value)
|
||||
|
||||
return None
|
||||
|
||||
@value.setter
|
||||
def value(self, v):
|
||||
self._value = v
|
||||
|
||||
def Success(self):
|
||||
self.choice.Enable(0)
|
||||
self.button.SetLabel("Disconnect")
|
||||
|
||||
def Clear(self):
|
||||
self.choice.Clear()
|
||||
self.choice.AppendItems(serial_ports.list_all())
|
||||
self.choice.Enable(1)
|
||||
self.button.SetLabel("Connect")
|
||||
|
||||
class InputButtons(wx.Panel):
|
||||
def __init__(self, parent, onChange=None):
|
||||
super().__init__(parent)
|
||||
sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||
self.onChange = onChange
|
||||
self.buttons = {}
|
||||
|
||||
for i in range(6):
|
||||
self.buttons[i] = wx.ToggleButton(self, size=(30,30), label="{}".format(i+1), id = i)
|
||||
self.buttons[i].Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.onClick(evt.GetId()))
|
||||
sizer.Add(self.buttons[i], wx.EXPAND | wx.SHAPED)
|
||||
|
||||
self.SetSizer(sizer)
|
||||
|
||||
def SetActive(self, num):
|
||||
for i in self.buttons:
|
||||
if num == i:
|
||||
self.buttons[i].SetValue(1)
|
||||
else:
|
||||
self.buttons[i].SetValue(0)
|
||||
def SetAll(self):
|
||||
for button in self.buttons.values():
|
||||
button.SetValue(1)
|
||||
|
||||
def onClick(self, num):
|
||||
self.SetActive(num)
|
||||
if self.onChange:
|
||||
self.onChange(num)
|
||||
|
||||
class MainPanel(wx.Panel):
|
||||
def __init__(self, parent):
|
||||
super().__init__(parent)
|
||||
self.parent = parent
|
||||
|
||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
|
||||
sizer.Add(self.Inputs(), 0, wx.EXPAND)
|
||||
sizer.Add(wx.StaticLine(self), 0, wx.EXPAND)
|
||||
sizer.Add(self.Sliders(), 1, wx.EXPAND)
|
||||
sizer.Add(wx.StaticLine(self), 0, wx.EXPAND)
|
||||
sizer.Add(self.Buttons())
|
||||
|
||||
self.SetSizer(sizer)
|
||||
|
||||
def Inputs(self):
|
||||
self.inputs = InputButtons(self, lambda num: self.reqInputChange(num))
|
||||
return self.inputs
|
||||
|
||||
def Sliders(self):
|
||||
self.gain = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=20, size=(30,100))
|
||||
self.latt = wx.Slider(self, style=wx.SL_VERTICAL , minValue=0, maxValue=90, size=(30,100))
|
||||
self.ratt = wx.Slider(self, style=wx.SL_VERTICAL , minValue=0, maxValue=90, size=(30,100))
|
||||
self.bboost = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=6, size=(30,100))
|
||||
self.tboost = wx.Slider(self, style=wx.SL_VERTICAL|wx.SL_INVERSE, minValue=0, maxValue=6, size=(30,100))
|
||||
|
||||
self.SliderEvent(self.gain, "G")
|
||||
self.SliderEvent(self.latt, "L")
|
||||
self.SliderEvent(self.ratt, "R")
|
||||
self.SliderEvent(self.bboost, "B")
|
||||
self.SliderEvent(self.tboost, "T")
|
||||
|
||||
sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||
|
||||
self.SliderLabel(sizer, self.gain, "Gain")
|
||||
self.SliderLabel(sizer, self.latt, "L")
|
||||
self.SliderLabel(sizer, self.ratt, "R")
|
||||
self.SliderLabel(sizer, self.bboost, "Bass Boost")
|
||||
self.SliderLabel(sizer, self.tboost, "Treble Boost")
|
||||
|
||||
sizer.AddStretchSpacer()
|
||||
|
||||
return sizer
|
||||
|
||||
def SliderEvent(self, slider, command):
|
||||
slider.Bind(wx.EVT_SLIDER, lambda evt: self.parent.write("{}{}".format(command, evt.GetInt())))
|
||||
|
||||
|
||||
def SliderLabel(self, sizer, slider, label):
|
||||
sizer.AddStretchSpacer()
|
||||
temp = wx.BoxSizer(wx.VERTICAL)
|
||||
temp.Add(slider, 1, wx.EXPAND|wx.ALL)
|
||||
temp.Add(wx.StaticText(self, label=label, style=wx.ALIGN_CENTRE), 0, wx.ALL)
|
||||
sizer.Add(temp, 0, wx.EXPAND | wx.ALL)
|
||||
|
||||
def Buttons(self):
|
||||
self.muteBtn = wx.ToggleButton(self, label="Mute")
|
||||
self.muteBtn.Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.parent.write("M{}".format(int(evt.IsChecked()))))
|
||||
self.shortBtn = wx.ToggleButton(self, label="Short")
|
||||
self.shortBtn.Bind(wx.EVT_TOGGLEBUTTON, lambda evt: self.parent.write("S{}".format(int(evt.IsChecked()))))
|
||||
|
||||
sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||
sizer.Add(self.muteBtn)
|
||||
sizer.Add(self.shortBtn)
|
||||
sizer.AddStretchSpacer()
|
||||
|
||||
return sizer
|
||||
|
||||
def reqInputChange(self, num):
|
||||
self.parent.write("C{}".format(num))
|
||||
|
||||
def setInput(self, num):
|
||||
self.inputs.SetActive(num)
|
||||
|
||||
def setGain(self, num):
|
||||
self.gain.setValue(num)
|
||||
|
||||
def setTreble(self, num):
|
||||
self.tboost.SetValue(num)
|
||||
|
||||
def setBass(self, num):
|
||||
self.bboost.SetValue(num)
|
||||
|
||||
def setLeftAtt(self, num):
|
||||
self.latt.SetValue(num)
|
||||
|
||||
def setRightAtt(self, num):
|
||||
self.ratt.SetValue(num)
|
||||
|
||||
def mute(self, m):
|
||||
self.muteBtn.SetValue(m)
|
||||
self.shortBtn.Enable(not m)
|
||||
self.inputs.Enable(not m)
|
||||
|
||||
def short(self, m):
|
||||
self.shortBtn.SetValue(m)
|
||||
self.muteBtn.Enable(not m)
|
||||
self.inputs.Enable(not m)
|
||||
|
||||
if m:
|
||||
self.inputs.SetAll()
|
||||
|
||||
class Frame(wx.Frame, AudioMuxConnection):
|
||||
def __init__(self):
|
||||
# init parents
|
||||
wx.Frame.__init__(self,None, title="AudioMux Control")
|
||||
AudioMuxConnection.__init__(self)
|
||||
|
||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
sizer.Add(self.newConnectionPanel(), 0, wx.EXPAND)
|
||||
sizer.Add(wx.StaticLine(self), 0, wx.EXPAND)
|
||||
sizer.Add(self.newMainPanel(), 1, wx.EXPAND)
|
||||
|
||||
self.SetSizerAndFit(sizer)
|
||||
|
||||
def newMainPanel(self):
|
||||
self.mainPanel = MainPanel(self)
|
||||
self.mainPanel.Enable(0)
|
||||
|
||||
return self.mainPanel
|
||||
|
||||
def newConnectionPanel(self):
|
||||
self.connectionPanel = ConnectionPanel(self, lambda choice: self.onConnectionRequest(choice))
|
||||
return self.connectionPanel
|
||||
|
||||
def onConnectionRequest(self, choice):
|
||||
if self.running:
|
||||
# stop the connection and update connection dialog
|
||||
self.close()
|
||||
self.connectionPanel.Clear()
|
||||
self.mainPanel.Enable(0)
|
||||
else:
|
||||
# try to start the serial connection
|
||||
AudioMuxConnection.__init__(self, choice)
|
||||
|
||||
def onConnected(self):
|
||||
# serial connection established
|
||||
self.connectionPanel.Success()
|
||||
self.start()
|
||||
self.mainPanel.Enable(1)
|
||||
|
||||
def onMessage(self, msg):
|
||||
msg = msg.strip("\r\n ")
|
||||
|
||||
if msg[0] == 'C':
|
||||
num = int(msg.strip("C"))
|
||||
self.mainPanel.setInput(num)
|
||||
elif msg[0] == 'G':
|
||||
num = int(msg.strip("G"))
|
||||
self.mainPanel.setGain(num)
|
||||
elif msg[0] == 'T':
|
||||
num = int(msg.strip("T"))
|
||||
self.mainPanel.setTreble(num)
|
||||
elif msg[0] == 'B':
|
||||
num = int(msg.strip("B"))
|
||||
self.mainPanel.setBass(num)
|
||||
elif msg[0] == 'M':
|
||||
num = int(msg.strip("M"))
|
||||
self.mainPanel.mute(num)
|
||||
elif msg[0] == 'S':
|
||||
num = int(msg.strip("S"))
|
||||
self.mainPanel.short(num)
|
||||
elif msg[0] == 'L':
|
||||
num = int(msg.strip("L"))
|
||||
self.mainPanel.setLeftAtt(num)
|
||||
elif msg[0] == 'R':
|
||||
num = int(msg.strip("R"))
|
||||
self.mainPanel.setRightAtt(num)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = wx.App()
|
||||
frame = Frame()
|
||||
|
||||
try:
|
||||
frame.Show()
|
||||
app.MainLoop()
|
||||
finally:
|
||||
frame.close()
|
||||
|
||||
|
||||
|
35
gui/serial_ports.py
Normal file
35
gui/serial_ports.py
Normal file
@ -0,0 +1,35 @@
|
||||
import sys
|
||||
import serial
|
||||
import glob
|
||||
|
||||
def list_all():
|
||||
""" Lists serial port names
|
||||
|
||||
:raises EnvironmentError:
|
||||
On unsupported or unknown platforms
|
||||
:returns:
|
||||
A list of the serial ports available on the system
|
||||
"""
|
||||
if sys.platform.startswith('win'):
|
||||
ports = ['COM%s' % (i + 1) for i in range(256)]
|
||||
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
|
||||
# this excludes your current terminal "/dev/tty"
|
||||
ports = glob.glob('/dev/tty[a-zA-Z]*')
|
||||
elif sys.platform.startswith('darwin'):
|
||||
ports = glob.glob('/dev/tty.*')
|
||||
else:
|
||||
raise EnvironmentError('Unsupported platform')
|
||||
|
||||
result = []
|
||||
for port in ports:
|
||||
try:
|
||||
s = serial.Serial(port)
|
||||
s.close()
|
||||
result.append(port)
|
||||
except (OSError, serial.SerialException):
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(list_all())
|
Loading…
Reference in New Issue
Block a user