Merging changes from subproc branch.

This is a fundamental redesign of the architecture, separating the different components (UI, project state, audio processing) into different processes.
A lot of stuff is either broken or not yet implemented, making the current state completely unusable again.
But it's well enough developed to say that this is the way forward, so let's switch development to master again.
looper
Ben Niemann 2016-07-10 13:12:28 +02:00
commit 87f1ae3a5e
104 changed files with 6951 additions and 3798 deletions

View File

@ -0,0 +1,565 @@
#!/usr/bin/python3
import pyximport
pyximport.install()
import asyncio
import sys
import argparse
import logging
import random
import functools
import quamash
from PyQt5.QtCore import Qt
from PyQt5 import QtCore
from PyQt5 import QtWidgets
from PyQt5 import QtGui
from noisicaa.core import ipc
from noisicaa.audioproc import mutations
from noisicaa.audioproc import audioproc_client
from noisicaa.audioproc import audioproc_process
from noisicaa.ui import load_history
logger = logging.getLogger()
class AudioProcClientImpl(object):
def __init__(self, event_loop):
super().__init__()
self.event_loop = event_loop
self.server = ipc.Server(self.event_loop, 'client')
async def setup(self):
await self.server.setup()
async def cleanup(self):
await self.server.cleanup()
class AudioProcClient(
audioproc_client.AudioProcClientMixin, AudioProcClientImpl):
def __init__(self, event_loop, window):
super().__init__(event_loop)
self.window = window
def handle_pipeline_mutation(self, mutation):
self.window.handle_pipeline_mutation(mutation)
def handle_pipeline_status(self, status):
self.window.handle_pipeline_status(status)
class AudioProcProcessImpl(object):
def __init__(self, event_loop):
super().__init__()
self.event_loop = event_loop
self.server = ipc.Server(self.event_loop, 'process')
async def setup(self):
await self.server.setup()
async def cleanup(self):
await self.server.cleanup()
class AudioProcProcess(
audioproc_process.AudioProcProcessMixin, AudioProcProcessImpl):
pass
class AudioPlaygroundApp(QtWidgets.QApplication):
def __init__(self):
super().__init__(['noisipg'])
async def main(self, event_loop):
audioproc = AudioProcProcess(event_loop)
await audioproc.setup()
try:
window = AudioPlaygroundWindow(event_loop)
client = AudioProcClient(event_loop, window)
window.client = client
await client.setup()
try:
await client.connect(audioproc.server.address)
window.set_node_types(await client.list_node_types())
window.show()
await window.close_event.wait()
self.quit()
finally:
await client.cleanup()
finally:
await audioproc.cleanup()
class Port(QtWidgets.QGraphicsRectItem):
def __init__(self, parent, node_id, port_name, port_direction):
super().__init__(parent)
self.node_id = node_id
self.port_name = port_name
self.port_direction = port_direction
self.setRect(0, 0, 45, 15)
self.setBrush(Qt.white)
if self.port_direction == 'input':
self.dot_pos = QtCore.QPoint(7, 7)
else:
self.dot_pos = QtCore.QPoint(45-7, 7)
dot = QtWidgets.QGraphicsRectItem(self)
dot.setRect(-1, -1, 3, 3)
dot.setPos(self.dot_pos)
dot.setBrush(Qt.black)
self.selected = False
def set_selected(self, selected):
if selected:
self.setBrush(Qt.red)
else:
self.setBrush(Qt.white)
self.selected = selected
def mousePressEvent(self, evt):
if evt.buttons() & Qt.LeftButton:
if not self.selected:
self.set_selected(True)
self.scene().select_port(
self.node_id, self.port_name, self.port_direction)
else:
self.set_selected(False)
self.scene().unselect_port(
self.node_id, self.port_name)
return super().mousePressEvent(evt)
class Node(QtWidgets.QGraphicsRectItem):
def __init__(self, parent, node_id, desc):
super().__init__(parent)
self.node_id = node_id
self.desc = desc
self.setFlag(self.ItemIsMovable, True)
self.setFlag(self.ItemSendsGeometryChanges, True)
self.setFlag(self.ItemIsSelectable, True)
self.setRect(0, 0, 100, 60)
if self.desc.is_system:
self.setBrush(QtGui.QBrush(QtGui.QColor(200, 200, 255)))
else:
self.setBrush(Qt.white)
self.ports = {}
self.connections = set()
label = QtWidgets.QGraphicsTextItem(self)
label.setPos(2, 2)
label.setPlainText(self.desc.name)
in_y = 25
out_y = 25
for port_name, port_direction, port_type in self.desc.ports:
if port_direction == 'input':
x = -5
y = in_y
in_y += 20
elif port_direction == 'output':
x = 105-45
y = out_y
out_y += 20
port = Port(self, self.node_id, port_name, port_direction)
port.setPos(x, y)
self.ports[port_name] = port
def itemChange(self, change, value):
if change == self.ItemPositionHasChanged:
for connection in self.connections:
connection.update()
return super().itemChange(change, value)
def contextMenuEvent(self, evt):
menu = QtWidgets.QMenu()
if not self.desc.is_system:
remove = menu.addAction("Remove")
remove.triggered.connect(self.onRemove)
menu.exec_(evt.screenPos())
evt.accept()
def onRemove(self):
for connection in self.connections:
task = self.scene().window.event_loop.create_task(
self.scene().window.client.disconnect_ports(
connection.node1.node_id, connection.port1.port_name,
connection.node2.node_id, connection.port2.port_name))
task.add_done_callback(
functools.partial(
self.scene().window.command_done_callback,
command="Disconnect ports %s:%s-%s:%s" % (
connection.node1.desc.name,
connection.port1.port_name,
connection.node2.desc.name,
connection.port2.port_name)))
task = self.scene().window.event_loop.create_task(
self.scene().window.client.remove_node(self.node_id))
task.add_done_callback(
functools.partial(
self.scene().window.command_done_callback,
command="Remove node %s" % self.desc.name))
class Connection(QtWidgets.QGraphicsLineItem):
def __init__(self, parent, node1, port1, node2, port2):
super().__init__(parent)
self.node1 = node1
self.port1 = port1
self.node2 = node2
self.port2 = port2
self.update()
def update(self):
pos1 = self.port1.mapToScene(self.port1.dot_pos)
pos2 = self.port2.mapToScene(self.port2.dot_pos)
self.setLine(QtCore.QLineF(pos1, pos2))
class Scene(QtWidgets.QGraphicsScene):
def __init__(self, window):
super().__init__()
self.window = window
self.selected_port1 = None
self.selected_port2 = None
def select_port(self, node_id, port_name, port_type):
if port_type == 'output':
if self.selected_port1 is not None:
node = self.window.nodes[self.selected_port1[0]]
port = node.ports[self.selected_port1[1]]
port.set_selected(False)
self.selected_port1 = (node_id, port_name)
elif port_type == 'input':
if self.selected_port2 is not None:
node = self.window.nodes[self.selected_port2[0]]
port = node.ports[self.selected_port2[1]]
port.set_selected(False)
self.selected_port2 = (node_id, port_name)
if self.selected_port1 and self.selected_port2:
node1 = self.window.nodes[self.selected_port1[0]]
port1 = node1.ports[self.selected_port1[1]]
node2 = self.window.nodes[self.selected_port2[0]]
port2 = node2.ports[self.selected_port2[1]]
connection_id = '%s:%s-%s-%s' % (
*self.selected_port1, *self.selected_port2)
if connection_id in self.window.connections:
task = self.window.event_loop.create_task(
self.window.client.disconnect_ports(
*self.selected_port1, *self.selected_port2))
task.add_done_callback(
functools.partial(
self.window.command_done_callback,
command="Disconnect ports %s:%s-%s:%s" % (
node1.desc.name, port1.port_name,
node2.desc.name, port2.port_name)))
else:
task = self.window.event_loop.create_task(
self.window.client.connect_ports(
*self.selected_port1, *self.selected_port2))
task.add_done_callback(
functools.partial(
self.window.command_done_callback,
command="Connect ports %s:%s-%s:%s" % (
node1.desc.name, port1.port_name,
node2.desc.name, port2.port_name)))
node = self.window.nodes[self.selected_port1[0]]
port = node.ports[self.selected_port1[1]]
port.set_selected(False)
self.selected_port1 = None
node = self.window.nodes[self.selected_port2[0]]
port = node.ports[self.selected_port2[1]]
port.set_selected(False)
self.selected_port2 = None
def unselect_port(self, node_id, port_name):
if (node_id, port_name) == self.selected_port1:
self.selected_port1 = None
if (node_id, port_name) == self.selected_port2:
self.selected_port2 = None
class QPathLineEdit(QtWidgets.QLineEdit):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
action = self.addAction(
QtGui.QIcon.fromTheme('document-open'),
self.TrailingPosition)
action.triggered.connect(self._selectFile)
def _selectFile(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
parent=self,
caption="Select path...",
directory=self.text())
if not path:
return
self.setText(path)
class CreateNodeWindow(QtWidgets.QDialog):
def __init__(self, window, node_type):
super().__init__(window)
self.window = window
self.node_type = node_type
self.setWindowTitle("Create {} node".format(self.node_type.name))
self.setModal(True)
playout = QtWidgets.QFormLayout()
self.widgets = {}
for pname, ptype in self.node_type.parameters:
if ptype == 'float':
widget = QtWidgets.QLineEdit(self)
widget.setText('0.0')
widget.setValidator(QtGui.QDoubleValidator())
playout.addRow(pname, widget)
elif ptype == 'int':
widget = QtWidgets.QLineEdit(self)
widget.setText('0')
widget.setValidator(QtGui.QIntValidator())
playout.addRow(pname, widget)
elif ptype == 'path':
widget = QPathLineEdit(self)
playout.addRow(pname, widget)
else:
raise ValueError("Unsupported parameter type %r" % ptype)
self.widgets[pname] = widget
create_button = QtWidgets.QPushButton("Create")
create_button.setDefault(True)
create_button.clicked.connect(self.create_node)
cancel_button = QtWidgets.QPushButton("Cancel")
cancel_button.clicked.connect(lambda: self.done(0))
layout = QtWidgets.QVBoxLayout()
layout.addLayout(playout, stretch=1)
blayout = QtWidgets.QHBoxLayout()
blayout.addStretch(1)
blayout.addWidget(create_button)
blayout.addWidget(cancel_button)
layout.addLayout(blayout)
self.setLayout(layout)
def create_node(self):
args = {}
for pname, ptype in self.node_type.parameters:
widget = self.widgets[pname]
if ptype == 'float':
value, _ = widget.locale().toDouble(widget.text())
elif ptype == 'int':
value, _ = widget.locale().toInt(widget.text())
elif ptype == 'path':
value = widget.text()
else:
raise ValueError("Unsupported parameter type %r" % ptype)
args[pname] = value
task = self.window.event_loop.create_task(
self.window.client.add_node(self.node_type.name, **args))
task.add_done_callback(
functools.partial(
self.window.command_done_callback,
command="Create node %s" % self.node_type.name))
self.done(0)
class AudioPlaygroundWindow(QtWidgets.QMainWindow):
def __init__(self, event_loop):
super().__init__()
self.event_loop = event_loop
self.client = None
self.close_event = asyncio.Event()
self.nodes = {}
self.connections = {}
self.setWindowTitle("noisicaä audio playground")
self.resize(800, 600)
menu_bar = self.menuBar()
project_menu = menu_bar.addMenu("Project")
project_menu.addAction(QtWidgets.QAction(
"Quit", self,
shortcut=QtGui.QKeySequence.Quit,
shortcutContext=Qt.ApplicationShortcut,
statusTip="Quit the application",
triggered=self.close_event.set))
statusbar = QtWidgets.QStatusBar()
self.pipeline_status = load_history.LoadHistoryWidget(100, 30)
self.pipeline_status.setToolTip("Load of the playback engine.")
statusbar.addPermanentWidget(self.pipeline_status)
self.setStatusBar(statusbar)
self.scene = Scene(self)
self.view = QtWidgets.QGraphicsView(self)
self.view.setScene(self.scene)
self.node_type_list = QtWidgets.QListWidget(self)
self.node_type_list.itemDoubleClicked.connect(self.doubleClicked)
layout = QtWidgets.QHBoxLayout()
layout.addWidget(self.view)
layout.addWidget(self.node_type_list)
central_widget = QtWidgets.QWidget(self)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
def closeEvent(self, evt):
self.close_event.set()
return super().closeEvent(evt)
def command_done_callback(self, task, command):
exc = task.exception()
if exc is not None:
logger.error("Command %s failed: %s", command, exc)
msg = QtWidgets.QMessageBox(self)
msg.setWindowTitle("Command failed")
msg.setText(command)
msg.setInformativeText(str(exc))
msg.setStandardButtons(QtWidgets.QMessageBox.Ok)
msg.setDefaultButton(QtWidgets.QMessageBox.Ok)
msg.setIcon(QtWidgets.QMessageBox.Warning)
msg.setModal(True)
msg.show()
def set_node_types(self, node_types):
self.node_type_list.clear()
for node_type in node_types:
item = QtWidgets.QListWidgetItem()
item.setText(node_type.name)
item.setData(Qt.UserRole, node_type)
self.node_type_list.addItem(item)
def doubleClicked(self, item):
node_type = item.data(Qt.UserRole)
if len(node_type.parameters) > 0:
win = CreateNodeWindow(self, node_type)
win.show()
else:
task = self.event_loop.create_task(
self.client.add_node(node_type.name))
task.add_done_callback(
functools.partial(
self.command_done_callback,
command="Create node %s" % self.node_type.name))
def handle_pipeline_mutation(self, mutation):
if isinstance(mutation, mutations.AddNode):
node = Node(None, mutation.id, mutation.desc)
node.setPos(random.randint(-200, 200), random.randint(-200, 200))
self.scene.addItem(node)
self.nodes[mutation.id] = node
elif isinstance(mutation, mutations.RemoveNode):
node = self.nodes[mutation.id]
self.scene.removeItem(node)
del self.nodes[mutation.id]
elif isinstance(mutation, mutations.ConnectPorts):
connection_id = '%s:%s-%s-%s' % (
mutation.node1, mutation.port1,
mutation.node2, mutation.port2)
node1 = self.nodes[mutation.node1]
node2 = self.nodes[mutation.node2]
port1 = node1.ports[mutation.port1]
port2 = node2.ports[mutation.port2]
connection = Connection(None, node1, port1, node2, port2)
self.scene.addItem(connection)
self.connections[connection_id] = connection
node1.connections.add(connection)
node2.connections.add(connection)
elif isinstance(mutation, mutations.DisconnectPorts):
connection_id = '%s:%s-%s-%s' % (
mutation.node1, mutation.port1,
mutation.node2, mutation.port2)
connection = self.connections[connection_id]
self.scene.removeItem(connection)
del self.connections[connection_id]
connection.node1.connections.remove(connection)
connection.node2.connections.remove(connection)
else:
logger.warning("Unknown mutation received: %s", mutation)
def handle_pipeline_status(self, status):
if 'utilization' in status:
self.pipeline_status.addValue(status['utilization'])
def main(argv):
parser = argparse.ArgumentParser(
prog=argv[0])
parser.add_argument(
'--log-level',
choices=['debug', 'info', 'warning', 'error', 'critical'],
default='error',
help="Minimum level for log messages written to STDERR.")
args = parser.parse_args(args=argv[1:])
logging.basicConfig()
logging.getLogger().setLevel({
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
}[args.log_level])
app = AudioPlaygroundApp()
event_loop = quamash.QEventLoop(app)
asyncio.set_event_loop(event_loop)
def app_complete_callback(task):
exc = task.exception
if exc is not None:
logger.error("%s", exc)
event_loop.stop()
with event_loop:
task = event_loop.create_task(app.main(event_loop))
task.add_done_callback(app_complete_callback)
event_loop.run_forever()
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@ -0,0 +1 @@
from .audioproc_client import AudioProcClientMixin

View File

@ -0,0 +1,81 @@
#!/usr/bin/python3
import logging
from noisicaa import core
from noisicaa.core import ipc
logger = logging.getLogger(__name__)
class AudioProcClientMixin(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._stub = None
self._session_id = None
async def setup(self):
await super().setup()
self.server.add_command_handler(
'PIPELINE_MUTATION', self.handle_pipeline_mutation)
self.server.add_command_handler(
'PIPELINE_STATUS', self.handle_pipeline_status)
async def connect(self, address):
assert self._stub is None
self._stub = ipc.Stub(self.event_loop, address)
await self._stub.connect()
self._session_id = await self._stub.call(
'START_SESSION', self.server.address)
async def disconnect(self, shutdown=False):
if self._session_id is not None:
await self._stub.call('END_SESSION', self._session_id)
self._session_id = None
if self._stub is not None:
if shutdown:
await self.shutdown()
await self._stub.close()
self._stub = None
async def shutdown(self):
await self._stub.call('SHUTDOWN')
async def list_node_types(self):
return await self._stub.call('LIST_NODE_TYPES', self._session_id)
async def add_node(self, node_type, **args):
return await self._stub.call('ADD_NODE', self._session_id, node_type, args)
async def remove_node(self, node_id):
return await self._stub.call('REMOVE_NODE', self._session_id, node_id)
async def connect_ports(self, node1_id, port1_name, node2_id, port2_name):
return await self._stub.call(
'CONNECT_PORTS', self._session_id,
node1_id, port1_name, node2_id, port2_name)
async def disconnect_ports(
self, node1_id, port1_name, node2_id, port2_name):
return await self._stub.call(
'DISCONNECT_PORTS', self._session_id,
node1_id, port1_name, node2_id, port2_name)
async def set_backend(self, name, **args):
return await self._stub.call(
'SET_BACKEND', self._session_id, name, args)
async def play_file(self, path):
return await self._stub.call(
'PLAY_FILE', self._session_id, path)
async def dump(self):
return await self._stub.call('DUMP', self._session_id)
def handle_pipeline_mutation(self, mutation):
logger.info("Mutation received: %s" % mutation)
def handle_pipeline_status(self, status):
logger.info("Status update received: %s" % status)

View File

@ -0,0 +1,96 @@
#!/usr/bin/python3
import asyncio
import time
import unittest
from unittest import mock
import asynctest
from noisicaa import core
from noisicaa.core import ipc
from . import audioproc_process
from . import audioproc_client
from . import node_types
class TestClientImpl(object):
def __init__(self, event_loop):
super().__init__()
self.event_loop = event_loop
self.server = ipc.Server(self.event_loop, 'client')
async def setup(self):
await self.server.setup()
async def cleanup(self):
await self.server.cleanup()
class TestClient(audioproc_client.AudioProcClientMixin, TestClientImpl):
pass
class TestAudioProcProcessImpl(object):
def __init__(self, event_loop):
super().__init__()
self.event_loop = event_loop
self.server = ipc.Server(self.event_loop, 'audioproc')
async def setup(self):
await self.server.setup()
async def cleanup(self):
await self.server.cleanup()
class TestAudioProcProcess(
audioproc_process.AudioProcProcessMixin, TestAudioProcProcessImpl):
pass
class ProxyTest(asynctest.TestCase):
async def setUp(self):
self.audioproc_process = TestAudioProcProcess(self.loop)
await self.audioproc_process.setup()
self.audioproc_task = self.loop.create_task(
self.audioproc_process.run())
self.client = TestClient(self.loop)
await self.client.setup()
await self.client.connect(self.audioproc_process.server.address)
async def tearDown(self):
await self.client.disconnect(shutdown=True)
await self.client.cleanup()
await asyncio.wait_for(self.audioproc_task, None)
await self.audioproc_process.cleanup()
async def test_list_node_types(self):
result = await self.client.list_node_types()
self.assertTrue(
all(isinstance(nt, node_types.NodeType) for nt in result),
result)
async def test_add_node(self):
node_id = await self.client.add_node('whitenoise')
self.assertIsInstance(node_id, str)
async def test_remove_node(self):
node_id = await self.client.add_node('whitenoise')
await self.client.remove_node(node_id)
async def test_connect_ports(self):
node1_id = await self.client.add_node('whitenoise')
node2_id = await self.client.add_node('scale', factor=0.2)
await self.client.connect_ports(node1_id, 'out', node2_id, 'in')
async def test_disconnect_ports(self):
node1_id = await self.client.add_node('whitenoise')
node2_id = await self.client.add_node('scale', factor=0.2)
await self.client.connect_ports(node1_id, 'out', node2_id, 'in')
await self.client.disconnect_ports(node1_id, 'out', node2_id, 'in')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,301 @@
#!/usr/bin/python3
import asyncio
import functools
import logging
import uuid
from noisicaa import core
from noisicaa.core import ipc
from . import backend
from . import pipeline
from . import mutations
from . import node_db
from . import nodes
from .filter import scale
from .source import whitenoise
from .source import silence
from .source import wavfile
from .source import fluidsynth
logger = logging.getLogger(__name__)
class InvalidSessionError(Exception): pass
class Session(object):
def __init__(self, event_loop, callback_stub):
self.event_loop = event_loop
self.callback_stub = callback_stub
self.id = uuid.uuid4().hex
self.pending_mutations = []
def cleanup(self):
pass
def publish_mutation(self, mutation):
if not self.callback_stub.connected:
self.pending_mutations.append(mutation)
return
callback_task = self.event_loop.create_task(
self.callback_stub.call('PIPELINE_MUTATION', mutation))
callback_task.add_done_callback(self.publish_mutation_done)
def publish_mutation_done(self, callback_task):
assert callback_task.done()
exc = callback_task.exception()
if exc is not None:
logger.error(
"PUBLISH_MUTATION failed with exception: %s", exc)
def publish_status(self, status):
callback_task = self.event_loop.create_task(
self.callback_stub.call('PIPELINE_STATUS', status))
callback_task.add_done_callback(self.publish_status_done)
def publish_status_done(self, callback_task):
assert callback_task.done()
exc = callback_task.exception()
if exc is not None:
logger.error("PUBLISH_STATUS failed with exception: %s", exc)
def callback_stub_connected(self):
assert self.callback_stub.connected
while self.pending_mutations:
self.publish_mutation(self.pending_mutations.pop(0))
class AudioProcProcessMixin(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.backend = None
async def setup(self):
await super().setup()
self._shutting_down = asyncio.Event()
self._shutdown_complete = asyncio.Event()
self.server.add_command_handler(
'START_SESSION', self.handle_start_session)
self.server.add_command_handler(
'END_SESSION', self.handle_end_session)
self.server.add_command_handler('SHUTDOWN', self.handle_shutdown)
self.server.add_command_handler(
'LIST_NODE_TYPES', self.handle_list_node_types)
self.server.add_command_handler(
'ADD_NODE', self.handle_add_node)
self.server.add_command_handler(
'REMOVE_NODE', self.handle_remove_node)
self.server.add_command_handler(
'CONNECT_PORTS', self.handle_connect_ports)
self.server.add_command_handler(
'DISCONNECT_PORTS', self.handle_disconnect_ports)
self.server.add_command_handler(
'SET_BACKEND', self.handle_set_backend)
self.server.add_command_handler(
'PLAY_FILE', self.handle_play_file)
self.server.add_command_handler(
'DUMP', self.handle_dump)
self.node_db = node_db.NodeDB()
self.node_db.add(scale.Scale)
self.node_db.add(silence.SilenceSource)
self.node_db.add(whitenoise.WhiteNoiseSource)
self.node_db.add(wavfile.WavFileSource)
self.node_db.add(fluidsynth.FluidSynthSource)
self.node_db.add(nodes.IPCNode)
self.node_db.add(nodes.PassThru)
self.node_db.add(nodes.TrackEventSource)
self.pipeline = pipeline.Pipeline()
self.pipeline.utilization_callback = self.utilization_callback
self.backend = None
self.audiosink = backend.AudioSinkNode(self.event_loop)
await self.audiosink.setup()
self.pipeline.add_node(self.audiosink)
self.midisource = backend.MidiSourceNode(self.event_loop)
await self.midisource.setup()
self.pipeline.add_node(self.midisource)
self.pipeline.start()
self.sessions = {}
async def cleanup(self):
self.pipeline.stop()
await super().cleanup()
async def run(self):
await self._shutting_down.wait()
logger.info("Shutting down...")
self.pipeline.stop()
self.pipeline.wait()
logger.info("Pipeline finished.")
self._shutdown_complete.set()
def utilization_callback(self, utilization):
self.event_loop.call_soon_threadsafe(
functools.partial(
self.publish_status, utilization=utilization))
def get_session(self, session_id):
try:
return self.sessions[session_id]
except KeyError:
raise InvalidSessionError
def publish_mutation(self, mutation):
for session in self.sessions.values():
session.publish_mutation(mutation)
def publish_status(self, **kwargs):
for session in self.sessions.values():
session.publish_status(kwargs)
def handle_start_session(self, client_address):
client_stub = ipc.Stub(self.event_loop, client_address)
connect_task = self.event_loop.create_task(client_stub.connect())
session = Session(self.event_loop, client_stub)
connect_task.add_done_callback(
functools.partial(self._client_connected, session))
self.sessions[session.id] = session
# Send initial mutations to build up the current pipeline
# state.
with self.pipeline.reader_lock():
for node in self.pipeline._nodes:
mutation = mutations.AddNode(node)
session.publish_mutation(mutation)
for node in self.pipeline._nodes:
for port in node.inputs.values():
for upstream_port in port.inputs:
mutation = mutations.ConnectPorts(
port, upstream_port)
session.publish_mutation(mutation)
return session.id
def _client_connected(self, session, connect_task):
assert connect_task.done()
exc = connect_task.exception()
if exc is not None:
logger.error("Failed to connect to callback client: %s", exc)
return
session.callback_stub_connected()
def handle_end_session(self, session_id):
session = self.get_session(session_id)
session.cleanup()
del self.sessions[session_id]
async def handle_shutdown(self):
logger.info("Shutdown received.")
self._shutting_down.set()
logger.info("Waiting for shutdown to complete...")
await self._shutdown_complete.wait()
logger.info("Shutdown complete.")
def handle_list_node_types(self, session_id):
self.get_session(session_id)
return self.node_db.node_types
async def handle_add_node(self, session_id, name, args):
session = self.get_session(session_id)
node = self.node_db.create(self.event_loop, name, args)
await node.setup()
with self.pipeline.writer_lock():
self.pipeline.add_node(node)
self.publish_mutation(mutations.AddNode(node))
return node.id
async def handle_remove_node(self, session_id, node_id):
session = self.get_session(session_id)
node = self.pipeline.find_node(node_id)
with self.pipeline.writer_lock():
self.pipeline.remove_node(node)
await node.cleanup()
self.publish_mutation(mutations.RemoveNode(node))
def handle_connect_ports(
self, session_id, node1_id, port1_name, node2_id, port2_name):
session = self.get_session(session_id)
node1 = self.pipeline.find_node(node1_id)
node2 = self.pipeline.find_node(node2_id)
with self.pipeline.writer_lock():
node2.inputs[port2_name].connect(node1.outputs[port1_name])
self.publish_mutation(
mutations.ConnectPorts(
node1.outputs[port1_name], node2.inputs[port2_name]))
def handle_disconnect_ports(
self, session_id, node1_id, port1_name, node2_id, port2_name):
session = self.get_session(session_id)
node1 = self.pipeline.find_node(node1_id)
node2 = self.pipeline.find_node(node2_id)
with self.pipeline.writer_lock():
node2.inputs[port2_name].disconnect(node1.outputs[port1_name])
self.publish_mutation(
mutations.DisconnectPorts(
node1.outputs[port1_name], node2.inputs[port2_name]))
def handle_set_backend(self, session_id, name, args):
self.get_session(session_id)
result = None
if name == 'pyaudio':
be = backend.PyAudioBackend(**args)
elif name == 'null':
be = backend.NullBackend(**args)
elif name == 'ipc':
be = backend.IPCBackend(**args)
result = be.address
elif name is None:
be = None
else:
raise ValueError("Invalid backend name %s" % name)
self.pipeline.set_backend(be)
return result
async def handle_play_file(self, session_id, path):
self.get_session(session_id)
node = wavfile.WavFileSource(
self.event_loop,
path=path, loop=False, end_notification='end')
await node.setup()
self.pipeline.notification_listener.add(
node.id,
functools.partial(self.play_file_done, node_id=node.id))
with self.pipeline.writer_lock():
sink = self.pipeline.find_node('sink')
self.pipeline.add_node(node)
sink.inputs['in'].connect(node.outputs['out'])
return node.id
def play_file_done(self, notification, node_id):
with self.pipeline.writer_lock():
node = self.pipeline.find_node(node_id)
sink = self.pipeline.find_node('sink')
sink.inputs['in'].disconnect(node.outputs['out'])
self.pipeline.remove_node(node)
self.event_loop.create_task(node.cleanup())
def handle_dump(self, session_id):
self.pipeline.dump()
class AudioProcProcess(AudioProcProcessMixin, core.ProcessImpl):
pass

View File

@ -0,0 +1,260 @@
#!/usr/bin/python3
import logging
import os
import os.path
import queue
import select
import tempfile
import threading
import time
import uuid
import pyaudio
from .resample import (Resampler,
AV_CH_LAYOUT_STEREO,
AV_SAMPLE_FMT_S16,
AV_SAMPLE_FMT_FLT)
from .node import Node
from .node_types import NodeType
from .ports import AudioInputPort, EventOutputPort
from .events import NoteOnEvent
from ..music.pitch import Pitch
from . import audio_format
from . import frame
logger = logging.getLogger(__name__)
class AudioSinkNode(Node):
desc = NodeType()
desc.name = 'audiosink'
desc.port('in', 'input', 'audio')
desc.is_system = True
def __init__(self, event_loop):
super().__init__(event_loop, id='sink')
self._input = AudioInputPort('in')
self.add_input(self._input)
def run(self, timepos):
self.pipeline.backend.write(self._input.frame)
class MidiSourceNode(Node):
desc = NodeType()
desc.name = 'midisource'
desc.port('out', 'output', 'events')
desc.is_system = True
def __init__(self, event_loop):
super().__init__(event_loop)
self._output = EventOutputPort('out')
self.add_output(self._output)
def run(self, timepos):
self._output.events.clear()
# TODO: real events from midi devices.
self._output.events.append(NoteOnEvent(timepos, Pitch('C4')))
class Backend(object):
def __init__(self):
self._stopped = threading.Event()
def setup(self):
pass
def cleanup(self):
pass
@property
def stopped(self):
return self._stopped.is_set()
def stop(self):
self._stopped.set()
def wait(self):
raise NotImplementedError
def write(self, frame):
raise NotImplementedError
class NullBackend(Backend):
def wait(self):
time.sleep(0.01)
def write(self, frame):
pass
class PyAudioBackend(Backend):
def __init__(self):
super().__init__()
self._audio = None
self._stream = None
self._resampler = None
self._buffer_lock = threading.Lock()
self._buffer = bytearray()
self._need_more = threading.Event()
self._bytes_per_sample = 2 * 2
self._buffer_threshold = 2048 * self._bytes_per_sample
def setup(self):
self._audio = pyaudio.PyAudio()
ch_layout = AV_CH_LAYOUT_STEREO
sample_fmt = AV_SAMPLE_FMT_S16
sample_rate = 44100
self._stream = self._audio.open(
format=pyaudio.paInt16,
channels=2,
rate=sample_rate,
output=True,
stream_callback=self._callback)
# use format of input buffer
self._resampler = Resampler(
AV_CH_LAYOUT_STEREO, AV_SAMPLE_FMT_FLT, 44100,
ch_layout, sample_fmt, sample_rate)
self._buffer.clear()
self._need_more.set()
def cleanup(self):
if self._stream is not None:
self._stream.close()
self._stream = None
if self._audio is not None:
self._audio.terminate()
self._audio = None
self._resampler = None
def _callback(self, in_data, frame_count, time_info, status):
num_bytes = frame_count * self._bytes_per_sample
with self._buffer_lock:
samples = self._buffer[:num_bytes]
del self._buffer[:num_bytes]
if len(self._buffer) < self._buffer_threshold:
self._need_more.set()
if len(samples) < num_bytes:
# buffer underrun, pad with silence
logger.warning(
"Buffer underrun, need %d samples, but only have %d",
frame_count, len(samples) / self._bytes_per_sample)
samples.extend([0] * (num_bytes - len(samples)))
return (bytes(samples), pyaudio.paContinue)
def stop(self):
super().stop()
self._need_more.set()
def wait(self):
if self.stopped:
return
self._need_more.wait()
def write(self, frame):
samples = self._resampler.convert(frame.as_bytes(), len(frame))
with self._buffer_lock:
self._buffer.extend(samples)
if len(self._buffer) >= self._buffer_threshold:
self._need_more.clear()
class IPCBackend(Backend):
def __init__(self, socket_dir=None):
super().__init__()
if socket_dir is None:
socket_dir = tempfile.gettempdir()
self.address = os.path.join(
socket_dir, 'audiostream.%s.pipe' % uuid.uuid4().hex)
self._pipe_in = None
self._pipe_out = None
self._poller = None
self._buffer = bytearray()
self._timepos = None
def setup(self):
super().setup()
os.mkfifo(self.address + '.send')
self._pipe_in = os.open(
self.address + '.send', os.O_RDONLY | os.O_NONBLOCK)
os.mkfifo(self.address + '.recv')
self._pipe_out = os.open(
self.address + '.recv', os.O_RDWR | os.O_NONBLOCK)
os.set_blocking(self._pipe_out, True)
self._poller = select.poll()
self._poller.register(self._pipe_in, select.POLLIN)
def cleanup(self):
if self._poller is not None:
self._poller.unregister(self._pipe_in)
self._poller = None
if self._pipe_in is not None:
os.close(self._pipe_in)
self._pipe_in = None
if self._pipe_out is not None:
os.close(self._pipe_out)
self._pipe_out = None
if os.path.exists(self.address + '.send'):
os.unlink(self.address + '.send')
if os.path.exists(self.address + '.recv'):
os.unlink(self.address + '.recv')
self._buffer.clear()
super().cleanup()
def wait(self):
while not self.stopped:
eol = self._buffer.find(b'\n')
if eol >= 0:
line = self._buffer[:eol]
del self._buffer[:eol+1]
assert line.startswith(b'#FR=')
self._timepos = int(line[4:])
return
if not self._poller.poll(0.5):
continue
dat = os.read(self._pipe_in, 1024)
self._buffer.extend(dat)
def write(self, frame):
samples = frame.as_bytes()
response = bytearray()
response.extend(b'#FR=%d\n' % self._timepos)
response.extend(b'SAMPLES=%d\n' % len(frame))
response.extend(b'LEN=%d\n' % len(samples))
response.extend(samples)
while response:
written = os.write(self._pipe_out, response)
del response[:written]

View File

@ -1,41 +0,0 @@
#!/usr/bin/python3
import logging
from ..exceptions import EndOfStreamError
from ..ports import AudioInputPort, AudioOutputPort
from ..node import Node
logger = logging.getLogger(__name__)
class Concat(Node):
def __init__(self):
super().__init__()
self._output = AudioOutputPort('out')
self.add_output(self._output)
self._timepos = 0
self._inputs = []
self._current_input = 0
def append_input(self, port):
p = AudioInputPort('in-%d' % (len(self.inputs) + 1))
self.add_input(p)
p.connect(port)
self._inputs.append(p)
def run(self):
while True:
if self._current_input >= len(self._inputs):
raise EndOfStreamError
try:
frame = self._inputs[self._current_input].get_frame(4096)
except EndOfStreamError:
self._current_input += 1
else:
self._timepos += len(frame)
self._output.add_frame(frame)
break

View File

@ -1,61 +0,0 @@
#!/usr/bin/python3
import unittest
from ..pipeline import Pipeline
from ..exceptions import EndOfStreamError
from ..ports import AudioOutputPort
from . import concat
class FakeOutputPort(AudioOutputPort):
def __init__(self, name, tracker, num):
super().__init__(name)
self.__tracker = tracker
self.__num = num
def get_frame(self, duration):
if self.__num == 0:
raise EndOfStreamError
else:
self.__num -= 1
self.__tracker.append((self.name, duration))
frame = self.create_frame(0)
frame.resize(duration)
return frame
def start(self):
pass
class ConcatTest(unittest.TestCase):
def testBasicRun(self):
pipeline = Pipeline()
tracker = []
source1 = FakeOutputPort('s1', tracker, 2)
source2 = FakeOutputPort('s2', tracker, 1)
source3 = FakeOutputPort('s3', tracker, 1)
node = concat.Concat()
pipeline.add_node(node)
node.append_input(source1)
node.append_input(source2)
node.append_input(source3)
node.setup()
try:
node.start()
while True:
try:
node.run()
except EndOfStreamError:
break
finally:
node.cleanup()
self.assertEqual(
tracker,
[('s1', 4096), ('s1', 4096), ('s2', 4096), ('s3', 4096)])
if __name__ == '__main__':
unittest.main()

View File

@ -1,70 +0,0 @@
#!/usr/bin/python3
import logging
import pprint
from noisicaa.core import callbacks
from ..ports import AudioInputPort, AudioOutputPort
from ..node import Node
from ..exceptions import EndOfStreamError
logger = logging.getLogger(__name__)
class Mix(Node):
_next_port = 1
def __init__(self, name=None, stop_on_end_of_stream=False):
super().__init__(name)
self._output = AudioOutputPort('out')
self.add_output(self._output)
self._timepos = 0
self._stop_on_end_of_stream = stop_on_end_of_stream
self._inputs = []
self.listeners = callbacks.CallbackRegistry()
def append_input(self, port):
with self.pipeline.writer_lock():
num = self._next_port
self.