Some parts of the infrastructure to stream audio from the project's audioproc process to the UI's backend.

looper
Ben Niemann 2016-07-03 04:15:01 +02:00
parent a0788010c0
commit 17ddba8b18
22 changed files with 175 additions and 176 deletions

View File

@ -54,14 +54,16 @@ class ProxyTest(asynctest.TestCase):
async def setUp(self):
self.audioproc_process = TestAudioProcProcess(self.loop)
await self.audioproc_process.setup()
self.audioproc_task = self.loop.create_task(
self.audioproc_process.run())
self.client = TestClient(self.loop)
await self.client.setup()
await self.client.connect(self.audioproc_process.server.address)
async def tearDown(self):
await self.client.shutdown()
await self.client.disconnect()
await self.client.disconnect(shutdown=True)
await self.client.cleanup()
await asyncio.wait_for(self.audioproc_task, None)
await self.audioproc_process.cleanup()
async def test_list_node_types(self):

View File

@ -12,6 +12,7 @@ from . import backend
from . import pipeline
from . import mutations
from . import node_db
from . import nodes
from .filter import scale
from .source import whitenoise
from .source import silence
@ -97,6 +98,8 @@ class AudioProcProcessMixin(object):
'SET_BACKEND', self.handle_set_backend)
self.server.add_command_handler(
'PLAY_FILE', self.handle_play_file)
self.server.add_command_handler(
'PROCESS_FRAME', self.handle_process_frame)
self.node_db = node_db.NodeDB()
self.node_db.add(scale.Scale)
@ -104,18 +107,19 @@ class AudioProcProcessMixin(object):
self.node_db.add(whitenoise.WhiteNoiseSource)
self.node_db.add(wavfile.WavFileSource)
self.node_db.add(fluidsynth.FluidSynthSource)
self.node_db.add(nodes.IPCNode)
self.pipeline = pipeline.Pipeline()
self.pipeline.utilization_callback = self.utilization_callback
self.backend = None
self.audiosink = backend.AudioSinkNode()
self.audiosink.setup()
self.audiosink = backend.AudioSinkNode(self.event_loop)
await self.audiosink.setup()
self.pipeline.add_node(self.audiosink)
self.midisource = backend.MidiSourceNode()
self.midisource.setup()
self.midisource = backend.MidiSourceNode(self.event_loop)
await self.midisource.setup()
self.pipeline.add_node(self.midisource)
self.pipeline.start()
@ -128,8 +132,10 @@ class AudioProcProcessMixin(object):
async def run(self):
await self._shutting_down.wait()
logger.info("Shutting down...")
self.pipeline.stop()
self.pipeline.wait()
logger.info("Pipeline finished.")
self._shutdown_complete.set()
def utilization_callback(self, utilization):
@ -189,28 +195,31 @@ class AudioProcProcessMixin(object):
del self.sessions[session_id]
async def handle_shutdown(self):
logger.info("Shutdown received.")
self._shutting_down.set()
logger.info("Waiting for shutdown to complete...")
await self._shutdown_complete.wait()
logger.info("Shutdown complete.")
def handle_list_node_types(self, session_id):
self.get_session(session_id)
return self.node_db.node_types
def handle_add_node(self, session_id, name, args):
async def handle_add_node(self, session_id, name, args):
session = self.get_session(session_id)
node = self.node_db.create(name, args)
node.setup()
node = self.node_db.create(self.event_loop, name, args)
await node.setup()
with self.pipeline.writer_lock():
self.pipeline.add_node(node)
self.publish_mutation(mutations.AddNode(node))
return node.id
def handle_remove_node(self, session_id, node_id):
async def handle_remove_node(self, session_id, node_id):
session = self.get_session(session_id)
node = self.pipeline.find_node(node_id)
with self.pipeline.writer_lock():
self.pipeline.remove_node(node)
node.cleanup()
await node.cleanup()
self.publish_mutation(mutations.RemoveNode(node))
def handle_connect_ports(
@ -242,6 +251,8 @@ class AudioProcProcessMixin(object):
be = backend.PyAudioBackend(**args)
elif name == 'null':
be = backend.NullBackend(**args)
elif name == 'ipc':
be = backend.IPCBackend(**args)
elif name is None:
be = None
else:
@ -249,12 +260,13 @@ class AudioProcProcessMixin(object):
self.pipeline.set_backend(be)
def handle_play_file(self, session_id, path):
async def handle_play_file(self, session_id, path):
self.get_session(session_id)
node = wavfile.WavFileSource(
self.event_loop,
path=path, loop=False, end_notification='end')
node.setup()
await node.setup()
self.pipeline.notification_listener.add(
node.id,
@ -273,7 +285,10 @@ class AudioProcProcessMixin(object):
sink = self.pipeline.find_node('sink')
sink.inputs['in'].disconnect(node.outputs['out'])
self.pipeline.remove_node(node)
node.cleanup()
self.event_loop.create_task(node.cleanup())
def handle_process_frame(self):
logger.info("process_frame received.")
class AudioProcProcess(AudioProcProcessMixin, core.ProcessImpl):

View File

@ -1,6 +1,7 @@
#!/usr/bin/python3
import logging
import queue
import threading
import time
@ -15,6 +16,8 @@ from .node_types import NodeType
from .ports import AudioInputPort, EventOutputPort
from .events import NoteOnEvent
from ..music.pitch import Pitch
from . import audio_format
from . import frame
logger = logging.getLogger(__name__)
@ -25,8 +28,8 @@ class AudioSinkNode(Node):
desc.port('in', 'input', 'audio')
desc.is_system = True
def __init__(self):
super().__init__(id='sink')
def __init__(self, event_loop):
super().__init__(event_loop, id='sink')
self._input = AudioInputPort('in')
self.add_input(self._input)
@ -41,8 +44,8 @@ class MidiSourceNode(Node):
desc.port('out', 'output', 'events')
desc.is_system = True
def __init__(self):
super().__init__()
def __init__(self, event_loop):
super().__init__(event_loop)
self._output = EventOutputPort('out')
self.add_output(self._output)
@ -56,7 +59,7 @@ class MidiSourceNode(Node):
class Backend(object):
def __init__(self):
pass
self._stopped = threading.Event()
def setup(self):
pass
@ -64,6 +67,13 @@ class Backend(object):
def cleanup(self):
pass
@property
def stopped(self):
return self._stopped.is_set()
def stop(self):
self._stopped.set()
def wait(self):
raise NotImplementedError
@ -144,7 +154,13 @@ class PyAudioBackend(Backend):
return (bytes(samples), pyaudio.paContinue)
def stop(self):
super().stop()
self._need_more.set()
def wait(self):
if self.stopped:
return
self._need_more.wait()
def write(self, frame):
@ -153,3 +169,37 @@ class PyAudioBackend(Backend):
self._buffer.extend(samples)
if len(self._buffer) >= self._buffer_threshold:
self._need_more.clear()
class IPCBackend(Backend):
def __init__(self):
super().__init__()
self._input_ready = threading.Event()
self._output_ready = threading.Event()
af = audio_format.AudioFormat(
audio_format.CHANNELS_STEREO,
audio_format.SAMPLE_FMT_FLT,
44100)
self._output = frame.Frame(af, 0, set())
def process_frame(self):
self._input_ready.set()
self._output_ready.wait()
self._output_ready.clear()
def stop(self):
super().stop()
self._input_ready.set()
def wait(self):
if self.stopped:
return
self._input_ready.wait()
self._input_ready.clear()
def write(self, frame):
self._output.resize(0)
self._output.append(frame)
self._output_ready.set()

View File

@ -2,13 +2,15 @@
import unittest
import asynctest
from ..pipeline import Pipeline
from ..source.whitenoise import WhiteNoiseSource
from . import scale
class ScaleTest(unittest.TestCase):
def testBasicRun(self):
class ScaleTest(asynctest.TestCase):
async def testBasicRun(self):
pipeline = Pipeline()
source = WhiteNoiseSource()
@ -17,12 +19,12 @@ class ScaleTest(unittest.TestCase):
node = scale.Scale(0.5)
pipeline.add_node(node)
node.inputs['in'].connect(source.outputs['out'])
node.setup()
await node.setup()
try:
node.collect_inputs()
node.run(0)
finally:
node.cleanup()
await node.cleanup()
if __name__ == '__main__':

View File

@ -12,7 +12,8 @@ logger = logging.getLogger(__name__)
class Node(object):
desc = None
def __init__(self, name=None, id=None):
def __init__(self, event_loop, name=None, id=None):
self.event_loop = event_loop
self.id = id or uuid.uuid4().hex
self.pipeline = None
self._name = name or type(self).__name__
@ -46,14 +47,14 @@ class Node(object):
parents.append(upstream_port.owner)
return parents
def setup(self):
async def setup(self):
"""Set up the node.
Any expensive initialization should go here.
"""
logger.info("%s: setup()", self.name)
def cleanup(self):
async def cleanup(self):
"""Clean up the node.
The counterpart of setup().

View File

@ -15,7 +15,7 @@ class NodeDB(object):
(cls.desc for cls in self._db.values()),
key=lambda desc: desc.name)
def create(self, name, args):
def create(self, event_loop, name, args):
cls = self._db[name]
return cls(**args)
return cls(event_loop, **args)

View File

@ -0,0 +1 @@
from .ipc import IPCNode

View File

@ -62,10 +62,17 @@ class Pipeline(object):
logger.info("Pipeline running.")
def stop(self):
if self._backend is not None:
logger.info("Stopping backend...")
self._backend.stop()
if self._thread is not None:
logger.info("Stopping pipeline thread...")
self._stopping.set()
self.wait()
logger.info("Pipeline thread stopped.")
self._thread = None
self._running = False
def wait(self):
@ -119,6 +126,8 @@ class Pipeline(object):
t0 = time.time()
self._backend.wait()
if self._backend.stopped:
break
t1 = time.time()
logger.debug("Processing frame @%d", timepos)
@ -148,11 +157,6 @@ class Pipeline(object):
except: # pylint: disable=bare-except
sys.excepthook(*sys.exc_info())
finally:
logger.info("Cleaning up nodes...")
for node in reversed(self.sorted_nodes):
node.cleanup()
def add_notification(self, node_id, notification):
self._notifications.append((node_id, notification))
@ -200,91 +204,3 @@ class Pipeline(object):
@property
def backend(self):
return self._backend
def demo(): # pragma: no cover
import pyximport
pyximport.install()
from .source.notes import NoteSource
from .source.fluidsynth import FluidSynthSource
from .source.whitenoise import WhiteNoiseSource
from .source.wavfile import WavFileSource
from .filter.scale import Scale
from .compose.timeslice import TimeSlice
from .compose.mix import Mix
from .sink.pyaudio import PyAudioSink
from .sink.encode import EncoderSink
from noisicaa import music
logger.setLevel(logging.DEBUG)
pipeline = Pipeline()
# project = music.BaseProject.make_demo()
# sheet = project.sheets[0]
# sheet_mixer = sheet.create_playback_source(
# pipeline, setup=False, recursive=True)
# noise_boost = Scale(0.1)
# pipeline.add_node(noise_boost)
# noise_boost.inputs['in'].connect(noise.outputs['out'])
# slice_noise = TimeSlice(200000)
# pipeline.add_node(slice_noise)
# slice_noise.inputs['in'].connect(noise_boost.outputs['out'])
# concat = Mix()
# pipeline.add_node(concat)
# #concat.append_input(slice_noise.outputs['out'])
# concat.append_input(sheet_mixer.outputs['out'])
noise = WhiteNoiseSource()
noise.setup()
pipeline.add_node(noise)
smpl = WavFileSource('/home/pink/Samples/fireworks.wav')
smpl.setup()
pipeline.add_node(smpl)
#sink = EncoderSink('flac', '/tmp/foo.flac')
sink = PyAudioSink()
sink.setup()
pipeline.add_node(sink)
#sink.inputs['in'].connect(noise.outputs['out'])
sink.inputs['in'].connect(smpl.outputs['out'])
pipeline.start()
try:
pipeline.wait()
except KeyboardInterrupt:
pipeline.stop()
pipeline.wait()
# source = Mix(
# MetronomeSource(22050),
# Concat(
# Mix(
# FluidsynthSource(),
# Concat(
# ,
# WavFileSource('/storage/home/share/sounds/new/2STEREO2.wav'),
# WavFileSource('/storage/home/share/sounds/new/2STEREO2.wav'),
# ),
# ),
# TimeSlice(SilenceSource(), 10000),
# WavFileSource(os.path.join(DATA_DIR, 'sounds', 'metronome.wav')),
# TimeSlice(SilenceSource(), 10000),
# WavFileSource(os.path.join(DATA_DIR, 'sounds', 'metronome.wav')),
# TimeSlice(SilenceSource(), 10000),
# WavFileSource(os.path.join(DATA_DIR, 'sounds', 'metronome.wav')),
# TimeSlice(SilenceSource(), 10000),
# ),
# )
# sink.run()
if __name__ == '__main__': # pragma: no cover
demo()

View File

@ -2,25 +2,27 @@
import unittest
import asynctest
from .exceptions import Error
from .node import Node
from .ports import InputPort, OutputPort
from . import pipeline
class PipelineTest(unittest.TestCase):
def testSortedNodes(self):
class PipelineTest(asynctest.TestCase):
async def testSortedNodes(self):
p = pipeline.Pipeline()
n1 = Node()
n1 = Node(self.loop)
p.add_node(n1)
n1.add_output(OutputPort('p'))
n2 = Node()
n2 = Node(self.loop)
p.add_node(n2)
n2.add_output(OutputPort('p'))
n3 = Node()
n3 = Node(self.loop)
p.add_node(n3)
n3.add_input(InputPort('p1'))
n3.add_input(InputPort('p2'))
@ -28,11 +30,11 @@ class PipelineTest(unittest.TestCase):
n3.inputs['p2'].connect(n2.outputs['p'])
n3.add_output(OutputPort('p'))
n4 = Node()
n4 = Node(self.loop)
p.add_node(n4)
n4.add_output(OutputPort('p'))
n5 = Node()
n5 = Node(self.loop)
p.add_node(n5)
n5.add_input(InputPort('p1'))
n5.add_input(InputPort('p2'))
@ -45,26 +47,26 @@ class PipelineTest(unittest.TestCase):
self.assertIn(pn, visited)
visited.add(n)
def testCyclicGraph(self):
async def testCyclicGraph(self):
p = pipeline.Pipeline()
n1 = Node()
n1 = Node(self.loop)
p.add_node(n1)
n1.add_output(OutputPort('p'))
n2 = Node()
n2 = Node(self.loop)
p.add_node(n2)
n2.add_input(InputPort('p1'))
n2.add_input(InputPort('p2'))
n2.add_output(OutputPort('p'))
n3 = Node()
n3 = Node(self.loop)
p.add_node(n3)
n3.add_input(InputPort('p'))
n3.add_output(OutputPort('p1'))
n3.add_output(OutputPort('p2'))
n4 = Node()
n4 = Node(self.loop)
p.add_node(n4)
n4.add_input(InputPort('p'))
n4.add_output(OutputPort('p'))

View File

@ -104,8 +104,8 @@ class EncoderSink(Node):
'flac': FlacEncoder,
}
def __init__(self, output_format, path):
super().__init__()
def __init__(self, event_loop, output_format, path):
super().__init__(event_loop)
self.output_format = output_format
self.path = path
@ -116,15 +116,15 @@ class EncoderSink(Node):
self._input = AudioInputPort('in')
self.add_input(self._input)
def setup(self):
super().setup()
async def setup(self):
await super().setup()
self._encoder.setup()
self._encoder.start()
def cleanup(self):
async def cleanup(self):
self._encoder.stop()
self._encoder.cleanup()
super().cleanup()
await super().cleanup()
def run(self, framesize=4096):
self._encoder.consume(self._input.frame)

View File

@ -2,28 +2,30 @@
import unittest
import asynctest
from ..pipeline import Pipeline
from ..source.whitenoise import WhiteNoiseSource
from . import encode
class PyAudioTest(unittest.TestCase):
def test_flac(self):
class EncodeTest(asynctest.TestCase):
async def test_flac(self):
pipeline = Pipeline()
source = WhiteNoiseSource()
source.setup()
source = WhiteNoiseSource(self.loop)
await source.setup()
pipeline.add_node(source)
node = encode.EncoderSink('flac', '/tmp/foo.flac')
node.setup()
node = encode.EncoderSink(self.loop, 'flac', '/tmp/foo.flac')
await node.setup()
pipeline.add_node(node)
node.inputs['in'].connect(source.outputs['out'])
try:
node.collect_inputs()
node.run(0)
finally:
node.cleanup()
await node.cleanup()
if __name__ == '__main__':

View File

@ -49,8 +49,8 @@ class FluidSynthSource(Node):
self._sfid = None
self._resampler = None
def setup(self):
super().setup()
async def setup(self):
await super().setup()
assert self._synth is None
@ -84,8 +84,8 @@ class FluidSynthSource(Node):
AV_CH_LAYOUT_STEREO, AV_SAMPLE_FMT_S16, 44100,
AV_CH_LAYOUT_STEREO, AV_SAMPLE_FMT_FLT, 44100)
def cleanup(self):
super().cleanup()
async def cleanup(self):
await super().cleanup()
self._resampler = None
if self._synth is not None:

View File

@ -29,7 +29,7 @@ class MetronomeSource(Node):
self._timepos = 0
self._buffer = None
def setup(self):
async def setup(self):
fp = wave.open(
os.path.join(DATA_DIR, 'sounds', 'metronome.wav'), 'rb')

View File

@ -20,8 +20,8 @@ class NoteSource(Node):
self._end_of_stream = None
self._event_source = None
def setup(self):
super().setup()
async def setup(self):
await super().setup()
self._end_of_stream = False
self._event_source = self._track.create_event_source()

View File

@ -14,8 +14,8 @@ class SilenceSource(Node):
desc.name = 'silence'
desc.port('out', 'output', 'audio')
def __init__(self):
super().__init__()
def __init__(self, event_loop):
super().__init__(event_loop)
self._output = AudioOutputPort('out')
self.add_output(self._output)

View File

@ -8,7 +8,7 @@ from . import silence
class SilenceTest(sourcetest.SourceTest):
def make_node(self):
return silence.SilenceSource()
return silence.SilenceSource(self.loop)
if __name__ == '__main__':

View File

@ -1,17 +1,17 @@
#!/usr/bin/python3
import unittest
import asynctest
class SourceTest(unittest.TestCase):
class SourceTest(asynctest.TestCase):
def make_node(self):
raise NotImplementedError
def testBasicRun(self):
async def testBasicRun(self):
node = self.make_node()
node.setup()
await node.setup()
try:
node.collect_inputs()
node.run(0)
finally:
node.cleanup()
await node.cleanup()

View File

@ -26,8 +26,8 @@ class WavFileSource(Node):
desc.parameter('loop', 'bool')
desc.parameter('end_notification', 'string')
def __init__(self, path, loop=False, end_notification=None):
super().__init__()
def __init__(self, event_loop, path, loop=False, end_notification=None):
super().__init__(event_loop)
self._output = AudioOutputPort('out')
self.add_output(self._output)
@ -41,8 +41,8 @@ class WavFileSource(Node):
self._pos = None
self._samples = None
def setup(self):
super().setup()
async def setup(self):
await super().setup()
fp = wave.open(self._path, 'rb')

View File

@ -10,7 +10,8 @@ TESTDATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'testdata')
class WavFileTest(sourcetest.SourceTest):
def make_node(self):
return wavfile.WavFileSource(os.path.join(TESTDATA_DIR, 'ping.wav'))
return wavfile.WavFileSource(
self.loop, os.path.join(TESTDATA_DIR, 'ping.wav'))
if __name__ == '__main__':

View File

@ -59,6 +59,7 @@ class ProjectClientMixin(object):
self._session_id = None
self._object_map = {}
self.project = None
self.audioproc_address = None
self.cls_map = {}
def __set_project(self, root_id):
@ -77,7 +78,7 @@ class ProjectClientMixin(object):
assert self._stub is None
self._stub = ipc.Stub(self.event_loop, address)
await self._stub.connect()
self._session_id, root_id = await self._stub.call(
self._session_id, self.audioproc_address, root_id = await self._stub.call(
'START_SESSION', self.server.address)
if root_id is not None:
# Connected to a loaded project.

View File

@ -82,18 +82,20 @@ class ProjectProcessMixin(object):
self.sessions = {}
self.pending_mutations = []
audioproc_process = await self.manager.call(
self.audioproc_address = await self.manager.call(
'CREATE_AUDIOPROC_PROCESS', 'project-%s' % id(self))
self.audioproc_client = AudioProcClient(
self.event_loop, self.server)
await self.audioproc_client.setup()
await self.audioproc_client.connect(audioproc_process)
await self.audioproc_client.connect(self.audioproc_address)
await self.audioproc_client.set_backend('ipc')
async def cleanup(self):
if self.audioproc_client is not None:
await self.audioproc_client.disconnect(shutdown=True)
await self.audioproc_client.cleanup()
self.audioproc_client = None
self.audioproc_address = None
async def run(self):
await self._shutting_down.wait()
@ -157,8 +159,8 @@ class ProjectProcessMixin(object):
if self.project is not None:
for mutation in self.add_object_mutations(self.project):
await session.publish_mutation(mutation)
return session.id, self.project.id
return session.id, None
return session.id, self.audioproc_address, self.project.id
return session.id, self.audioproc_address, None
def handle_end_session(self, session_id):
session = self.get_session(session_id)

View File

@ -180,12 +180,12 @@ class BaseEditorApp(QApplication):
async def createProject(self, path):
project = await self.project_registry.create_project(path)
self.addProject(project)
await self.addProject(project)
return project
async def openProject(self, path):
project = await self.project_registry.open_project(path)
self.addProject(project)
await self.addProject(project)
return project
def _updateOpenedProjects(self):
@ -197,8 +197,12 @@ class BaseEditorApp(QApplication):
in self.project_registry.projects.values()
if project.path))
def addProject(self, project_connection):
async def addProject(self, project_connection):
self.win.addProjectView(project_connection)
node_id = await self.audioproc_client.add_node(
'ipc', address=project_connection.client.audioproc_address)
await self.audioproc_client.connect_ports(
node_id, 'out', 'sink', 'in')
self._updateOpenedProjects()
async def removeProject(self, project_connection):
@ -240,12 +244,12 @@ class EditorApp(BaseEditorApp):
else:
project = await self.project_registry.open_project(
path)
self.addProject(project)
await self.addProject(project)
else:
reopen_projects = self.settings.value('opened_projects', [])
for path in reopen_projects or []:
project = await self.project_registry.open_project(path)
self.addProject(project)
await self.addProject(project)
self.aboutToQuit.connect(self.shutDown)