Is it over-engineering to build a full compiler for the pipeline?

looper
Ben Niemann 6 years ago
parent b794e7d41b
commit 721c65fc27

@ -0,0 +1,37 @@
#!/usr/bin/python3
class ASTNode(object):
def __init__(self):
self.children = []
def __str__(self):
return type(self).__name__
def dump(self, indent=0):
out = ' ' * indent + str(self) + '\n'
for child in self.children:
out += child.dump(indent + 1)
return out
def walk(self):
yield self
yield from self.children
class Sequence(ASTNode):
pass
class CallNode(ASTNode):
def __init__(self, node_id):
super().__init__()
self.node_id = node_id
def __str__(self):
return '%s(%s)' % (super().__str__(), self.node_id)
class OutputStereo(ASTNode):
pass

@ -11,7 +11,7 @@ from noisicaa import core
from noisicaa.core import ipc
from . import backend
from . import pipeline
from . import pipeline_vm
from . import mutations
from . import node_db
from . import nodes
@ -79,13 +79,13 @@ class AudioProcProcessMixin(object):
super().__init__(*args, **kwargs)
self.shm_name = shm
self.shm = None
self.pipeline = None
self.__vm = None
async def setup(self):
await super().setup()
self._shutting_down = asyncio.Event()
self._shutdown_complete = asyncio.Event()
self.__shutting_down = asyncio.Event()
self.__shutdown_complete = asyncio.Event()
self.server.add_command_handler(
'START_SESSION', self.handle_start_session)
@ -135,20 +135,15 @@ class AudioProcProcessMixin(object):
if self.shm_name is not None:
self.shm = posix_ipc.SharedMemory(self.shm_name)
self.pipeline = pipeline.Pipeline(shm=self.shm)
self.pipeline.utilization_callback = self.utilization_callback
self.pipeline.listeners.add('perf_data', self.perf_data_callback)
self.pipeline.listeners.add('node_state', self.node_state_callback)
self.__vm = pipeline_vm.PipelineVM(shm=self.shm)
self.__vm.listeners.add('perf_data', self.perf_data_callback)
self.__vm.listeners.add('node_state', self.node_state_callback)
self.audiosink = backend.AudioSinkNode(self.event_loop)
await self.pipeline.setup_node(self.audiosink)
self.pipeline.add_node(self.audiosink)
self.__vm.setup()
self.eventsource = backend.SystemEventSourceNode(self.event_loop)
await self.pipeline.setup_node(self.eventsource)
self.pipeline.add_node(self.eventsource)
self.pipeline.start()
sink = nodes.Sink(self.event_loop)
await self.__vm.setup_node(sink)
self.__vm.add_node(sink)
self.sessions = {}
@ -161,24 +156,18 @@ class AudioProcProcessMixin(object):
self.shm.close_fd()
self.shm = None
if self.pipeline is not None:
self.pipeline.stop()
self.pipeline = None
if self.__vm is not None:
self.__vm.cleanup()
self.__vm = None
await super().cleanup()
async def run(self):
await self._shutting_down.wait()
await self.__shutting_down.wait()
logger.info("Shutting down...")
self.pipeline.stop()
self.pipeline.wait()
self.__vm.cleanup()
logger.info("Pipeline finished.")
self._shutdown_complete.set()
def utilization_callback(self, utilization):
self.event_loop.call_soon_threadsafe(
functools.partial(
self.publish_status, utilization=utilization))
self.__shutdown_complete.set()
def get_session(self, session_id):
try:
@ -199,16 +188,16 @@ class AudioProcProcessMixin(object):
connect_task = self.event_loop.create_task(client_stub.connect())
session = Session(self.event_loop, client_stub, flags)
connect_task.add_done_callback(
functools.partial(self._client_connected, session))
functools.partial(self.__client_connected, session))
self.sessions[session.id] = session
# Send initial mutations to build up the current pipeline
# state.
with self.pipeline.reader_lock():
for node in self.pipeline._nodes:
with self.__vm.reader_lock():
for node in self.__vm.nodes:
mutation = mutations.AddNode(node)
session.publish_mutation(mutation)
for node in self.pipeline._nodes:
for node in self.__vm.nodes:
for port in node.inputs.values():
for upstream_port in port.inputs:
mutation = mutations.ConnectPorts(
@ -217,7 +206,7 @@ class AudioProcProcessMixin(object):
return session.id
def _client_connected(self, session, connect_task):
def __client_connected(self, session, connect_task):
assert connect_task.done()
exc = connect_task.exception()
if exc is not None:
@ -233,34 +222,34 @@ class AudioProcProcessMixin(object):
async def handle_shutdown(self):
logger.info("Shutdown received.")
self._shutting_down.set()
self.__shutting_down.set()
logger.info("Waiting for shutdown to complete...")
await self._shutdown_complete.wait()
await self.__shutdown_complete.wait()
logger.info("Shutdown complete.")
async def handle_add_node(self, session_id, name, args):
session = self.get_session(session_id)
node = self.node_db.create(self.event_loop, name, args)
await self.pipeline.setup_node(node)
with self.pipeline.writer_lock():
self.pipeline.add_node(node)
await self.__vm.setup_node(node)
with self.__vm.writer_lock():
self.__vm.add_node(node)
self.publish_mutation(mutations.AddNode(node))
return node.id
async def handle_remove_node(self, session_id, node_id):
session = self.get_session(session_id)
node = self.pipeline.find_node(node_id)
with self.pipeline.writer_lock():
self.pipeline.remove_node(node)
node = self.__vm.find_node(node_id)
with self.__vm.writer_lock():
self.__vm.remove_node(node)
await node.cleanup()
self.publish_mutation(mutations.RemoveNode(node))
def handle_connect_ports(
self, session_id, node1_id, port1_name, node2_id, port2_name):
session = self.get_session(session_id)
node1 = self.pipeline.find_node(node1_id)
node2 = self.pipeline.find_node(node2_id)
with self.pipeline.writer_lock():
node1 = self.__vm.find_node(node1_id)
node2 = self.__vm.find_node(node2_id)
with self.__vm.writer_lock():
node2.inputs[port2_name].connect(node1.outputs[port1_name])
self.publish_mutation(
mutations.ConnectPorts(
@ -269,9 +258,9 @@ class AudioProcProcessMixin(object):
def handle_disconnect_ports(
self, session_id, node1_id, port1_name, node2_id, port2_name):
session = self.get_session(session_id)
node1 = self.pipeline.find_node(node1_id)
node2 = self.pipeline.find_node(node2_id)
with self.pipeline.writer_lock():
node1 = self.__vm.find_node(node1_id)
node2 = self.__vm.find_node(node2_id)
with self.__vm.writer_lock():
node2.inputs[port2_name].disconnect(node1.outputs[port1_name])
self.publish_mutation(
mutations.DisconnectPorts(
@ -294,12 +283,12 @@ class AudioProcProcessMixin(object):
else:
raise ValueError("Invalid backend name %s" % name)
self.pipeline.set_backend(be)
self.__vm.set_backend(be)
return result
def handle_set_frame_size(self, session_id, frame_size):
self.get_session(session_id)
self.pipeline.set_frame_size(frame_size)
self.__vm.set_frame_size(frame_size)
def perf_data_callback(self, perf_data):
self.event_loop.call_soon_threadsafe(
@ -319,13 +308,13 @@ class AudioProcProcessMixin(object):
path=path, loop=False, end_notification='end')
await node.setup()
self.pipeline.notification_listener.add(
self.__vm.notification_listener.add(
node.id,
functools.partial(self.play_file_done, node_id=node.id))
with self.pipeline.writer_lock():
sink = self.pipeline.find_node('sink')
self.pipeline.add_node(node)
with self.__vm.writer_lock():
sink = self.__vm.find_node('sink')
self.__vm.add_node(node)
sink.inputs['in'].connect(node.outputs['out'])
return node.id
@ -333,8 +322,8 @@ class AudioProcProcessMixin(object):
async def handle_add_event(self, session_id, queue, event):
self.get_session(session_id)
with self.pipeline.writer_lock():
backend = self.pipeline.backend
with self.__vm.writer_lock():
backend = self.__vm.backend
if backend is None:
logger.warning(
"Ignoring event %s: no backend active:", event)
@ -345,28 +334,28 @@ class AudioProcProcessMixin(object):
self, session_id, node_id, port_name, kwargs):
self.get_session(session_id)
node = self.pipeline.find_node(node_id)
node = self.__vm.find_node(node_id)
port = node.outputs[port_name]
with self.pipeline.writer_lock():
with self.__vm.writer_lock():
port.set_prop(**kwargs)
async def handle_set_node_param(self, session_id, node_id, kwargs):
self.get_session(session_id)
node = self.pipeline.find_node(node_id)
with self.pipeline.writer_lock():
node = self.__vm.find_node(node_id)
with self.__vm.writer_lock():
node.set_param(**kwargs)
def play_file_done(self, notification, node_id):
with self.pipeline.writer_lock():
node = self.pipeline.find_node(node_id)
sink = self.pipeline.find_node('sink')
with self.__vm.writer_lock():
node = self.__vm.find_node(node_id)
sink = self.__vm.find_node('sink')
sink.inputs['in'].disconnect(node.outputs['out'])
self.pipeline.remove_node(node)
self.__vm.remove_node(node)
self.event_loop.create_task(node.cleanup())
def handle_dump(self, session_id):
self.pipeline.dump()
self.__vm.dump()
class AudioProcProcess(AudioProcProcessMixin, core.ProcessImpl):

@ -33,30 +33,6 @@ from . import data
logger = logging.getLogger(__name__)
class AudioSinkNode(Node):
class_name = 'audiosink'
def __init__(self, event_loop):
description = node_db.SystemNodeDescription(
ports=[
node_db.AudioPortDescription(
name='in',
direction=node_db.PortDirection.Input,
channels='stereo'),
])
super().__init__(event_loop, description, id='sink')
def run(self, ctxt):
input_port = self.inputs['in']
ctxt.out_frame = data.FrameData()
ctxt.out_frame.sample_pos = ctxt.sample_pos
ctxt.out_frame.duration = ctxt.duration
ctxt.out_frame.samples = input_port.frame.as_bytes()
ctxt.out_frame.num_samples = len(input_port.frame)
class SystemEventSourceNode(Node):
class_name = 'systemeventsource'
@ -98,12 +74,18 @@ class Backend(object):
def stop(self):
self._stopped.set()
def wait(self, ctxt):
def begin_frame(self, ctxt):
raise NotImplementedError
def end_frame(self, ctxt):
raise NotImplementedError
def write(self, ctxt):
raise NotImplementedError
def output(self, layout, num_samples, samples):
raise NotImplementedError
def clear_events(self):
self._event_queues.clear()
@ -128,10 +110,13 @@ class Backend(object):
class NullBackend(Backend):
def wait(self, sample_pos):
def begin_frame(self, ctxt):
time.sleep(0.01)
def write(self, frame):
def end_frame(self, ctxt):
pass
def write(self, ctxt):
pass
@ -206,22 +191,35 @@ class PyAudioBackend(Backend):
super().stop()
self._need_more.set()
def wait(self, ctxt):
def begin_frame(self, ctxt):
if self.stopped:
return
self._need_more.wait()
self.clear_events()
def write(self, ctxt):
assert ctxt.out_frame is not None
assert ctxt.out_frame.samples is not None
assert ctxt.out_frame.num_samples > 0
samples = self._resampler.convert(
ctxt.out_frame.samples, ctxt.out_frame.num_samples)
def end_frame(self, ctxt):
pass
def output(self, layout, num_samples, samples):
assert layout == AV_CH_LAYOUT_STEREO
# TODO: feed non-interleaved sample buffers directly into
# resample
interleaved = bytearray(8 * num_samples)
interleaved[0::8] = samples[0][0::4]
interleaved[1::8] = samples[0][1::4]
interleaved[2::8] = samples[0][2::4]
interleaved[3::8] = samples[0][3::4]
interleaved[4::8] = samples[1][0::4]
interleaved[5::8] = samples[1][1::4]
interleaved[6::8] = samples[1][2::4]
interleaved[7::8] = samples[1][3::4]
converted = self._resampler.convert(interleaved, num_samples)
with self._buffer_lock:
self._buffer.extend(samples)
self._buffer.extend(converted)
if len(self._buffer) >= self._buffer_threshold:
self._need_more.clear()
self.clear_events()
class Stopped(Exception):
@ -253,7 +251,7 @@ class IPCBackend(Backend):
self._stream.close()
super().stop()
def wait(self, ctxt):
def begin_frame(self, ctxt):
try:
ctxt.in_frame = self._stream.receive_frame()
ctxt.duration = ctxt.in_frame.duration
@ -269,6 +267,9 @@ class IPCBackend(Backend):
logger.warning("Stopping IPC backend.")
self.stop()
def end_frame(self, ctxt):
pass
def write(self, ctxt):
assert ctxt.out_frame is not None
assert ctxt.perf.current_span_id == 0

@ -0,0 +1,34 @@
#!/usr/bin/python3
import enum
import logging
import math
import os
import random
import struct
import sys
import threading
import time
import toposort
from . import ast
logger = logging.getLogger(__name__)
class Compiler(object):
def __init__(self, graph):
self.__graph = graph
def build_ast(self):
root = ast.Sequence()
sorted_nodes = toposort.toposort_flatten(
{n: set(n.parent_nodes) for n in self.__graph.nodes},
sort=False)
for n in sorted_nodes:
root.children.append(n.get_ast())
return root

@ -0,0 +1,45 @@
#!/usr/bin/python3
import logging
import struct
import threading
import time
import unittest
import asynctest
from noisicaa import node_db
from . import backend
from . import data
from . import pipeline_vm
from . import resample
from . import nodes
from . import compiler
logger = logging.getLogger(__name__)
class CompilerTest(asynctest.TestCase):
async def test_foo(self):
graph = pipeline_vm.PipelineGraph()
node1 = nodes.PassThru(self.loop)
graph.add_node(node1)
node2 = nodes.PassThru(self.loop)
graph.add_node(node2)
node2.inputs['in'].connect(node1.outputs['out'])
node3 = nodes.Sink(self.loop)
graph.add_node(node3)
node3.inputs['audio_left'].connect(node1.outputs['out'])
node3.inputs['audio_left'].connect(node2.outputs['out'])
node3.inputs['audio_right'].connect(node2.outputs['out'])
comp = compiler.Compiler(graph)
ast = comp.build_ast()
print(ast.dump())
if __name__ == '__main__':
unittest.main()

@ -6,8 +6,8 @@ import uuid
from noisicaa import node_db
from .exceptions import Error
from . import audio_format
from . import ports
from . import ast
logger = logging.getLogger(__name__)
@ -74,14 +74,6 @@ class Node(object):
and port_desc.port_type == node_db.PortType.Events):
kwargs['csound_instr'] = port_desc.csound_instr
if (port_desc.port_type == node_db.PortType.Audio):
if len(port_desc.channels) == 1:
kwargs['channels'] = audio_format.CHANNELS_MONO
elif len(port_desc.channels) == 2:
kwargs['channels'] = audio_format.CHANNELS_STEREO
else:
raise ValueError(port_desc.channels)
port = port_cls(port_desc.name, **kwargs)
if port_desc.direction == node_db.PortDirection.Input:
self.add_input(port)
@ -156,14 +148,20 @@ class Node(object):
"""
logger.info("%s: cleanup()", self.name)
def collect_inputs(self, ctxt):
for port in self.inputs.values():
port.collect_inputs(ctxt)
def get_ast(self):
raise NotImplementedError
def post_run(self, ctxt):
for port in self.outputs.values():
port.post_run(ctxt)
class CustomNode(Node):
def get_ast(self):
return ast.CallNode(self.id)
def connect_port(self, port_name, buf, offset):
raise NotImplementedError
def run(self, ctxt):
raise NotImplementedError
class BuiltinNode(Node):
pass

@ -1,3 +1,4 @@
from .builtin import Sink
from .ipc import IPCNode
from .passthru import PassThru
from .track_control_source import TrackControlSource

@ -0,0 +1,30 @@
#!/usr/bin/python3
import logging
from noisicaa import node_db
from .. import node
from .. import ast
logger = logging.getLogger(__name__)
class Sink(node.BuiltinNode):
class_name = 'sink'
def __init__(self, event_loop):
description = node_db.SystemNodeDescription(
ports=[
node_db.AudioPortDescription(
name='audio_left',
direction=node_db.PortDirection.Input),
node_db.AudioPortDescription(
name='audio_right',
direction=node_db.PortDirection.Input),
])
super().__init__(event_loop, description, id='sink')
def get_ast(self):
return ast.OutputStereo()

@ -16,7 +16,7 @@ from .. import audio_format
logger = logging.getLogger(__name__)
class LV2(node.Node):
class LV2(node.CustomNode):
class_name = 'lv2'
__world = None
@ -51,29 +51,29 @@ class LV2(node.Node):
self.__instance.activate()
self.__buffers = {}
for port in self.description.ports:
if port.port_type == node_db.PortType.Audio:
logger.info("Creating audio port buffer %s...", port.name)
buf = numpy.zeros(shape=(10240,), dtype=numpy.float32)
self.__buffers[port.name] = buf
elif port.port_type == node_db.PortType.Control:
logger.info("Creating control port buffer %s...", port.name)
buf = numpy.zeros(shape=(1,), dtype=numpy.float32)
self.__buffers[port.name] = buf
elif port.port_type == node_db.PortType.Events:
logger.info("Creating event port buffer %s...", port.name)
buf = bytearray(4096)
forge = lv2.AtomForge(self.__world.urid_mapper)
self.__buffers[port.name] = (buf, forge)
else:
raise ValueError(port.port_type)
lv2_port = self.__plugin.get_port_by_symbol(self.__world.new_string(port.name))
assert lv2_port is not None, port.name
self.__instance.connect_port(lv2_port.get_index(), buf)
# for port in self.description.ports:
# if port.port_type == node_db.PortType.Audio:
# logger.info("Creating audio port buffer %s...", port.name)
# buf = numpy.zeros(shape=(10240,), dtype=numpy.float32)
# self.__buffers[port.name] = buf
# elif port.port_type == node_db.PortType.Control:
# logger.info("Creating control port buffer %s...", port.name)
# buf = numpy.zeros(shape=(1,), dtype=numpy.float32)
# self.__buffers[port.name] = buf
# elif port.port_type == node_db.PortType.Events:
# logger.info("Creating event port buffer %s...", port.name)
# buf = bytearray(4096)
# forge = lv2.AtomForge(self.__world.urid_mapper)
# self.__buffers[port.name] = (buf, forge)
# else:
# raise ValueError(port.port_type)
# lv2_port = self.__plugin.get_port_by_symbol(self.__world.new_string(port.name))
# assert lv2_port is not None, port.name
# self.__instance.connect_port(lv2_port.get_index(), buf)
for parameter in self.description.parameters:
if parameter.param_type == node_db.ParameterType.Float:
@ -97,49 +97,51 @@ class LV2(node.Node):
await super().cleanup()
def connect_port(self, port_name, buf, offset):
lv2_port = self.__plugin.get_port_by_symbol(
self.__world.new_string(port_name))
assert lv2_port is not None, port_name
self.__instance.connect_port(lv2_port.get_index(), buf, offset)
def run(self, ctxt):
for port in self.description.ports:
if port.port_type == node_db.PortType.Audio:
buf = self.__buffers[port.name]
for port_name, port in self.inputs.items():
if isinstance(port, ports.AudioInputPort):
buf = self.__buffers[port_name]
if len(buf) < ctxt.duration:
buf.resize(ctxt.duration, refcheck=False)
lv2_port = self.__plugin.get_port_by_symbol(
self.__world.new_string(port_name))
assert lv2_port is not None, port_name
self.__instance.connect_port(lv2_port.get_index(), buf)
buf[0:ctxt.duration] = port.frame.samples[0]
elif isinstance(port, ports.ControlInputPort):
buf = self.__buffers[port_name]
buf[0] = port.frame[0]
elif isinstance(port, ports.EventInputPort):
buf, forge = self.__buffers[port_name]
forge.set_buffer(buf, 4096)
with forge.sequence():
for event in port.events:
sample_pos = event.sample_pos - ctxt.sample_pos
assert 0 <= sample_pos < ctxt.duration
if isinstance(event, events.NoteOnEvent):
forge.write_midi_event(
sample_pos,
bytes([0x90, event.note.midi_note, event.volume]), 3)
elif isinstance(event, events.NoteOffEvent):
forge.write_midi_event(
sample_pos,
bytes([0x80, event.note.midi_note, 0]), 3)
else:
raise NotImplementedError(
"Event class %s not supported" % type(event).__name__)
else:
raise ValueError(port)
# for port_name, port in self.inputs.items():
# if isinstance(port, ports.AudioInputPort):
# buf = self.__buffers[port_name]
# if len(buf) < ctxt.duration:
# buf.resize(ctxt.duration, refcheck=False)
# lv2_port = self.__plugin.get_port_by_symbol(
# self.__world.new_string(port_name))
# assert lv2_port is not None, port_name
# self.__instance.connect_port(lv2_port.get_index(), buf)
# buf[0:ctxt.duration] = port.frame.samples[0]
# elif isinstance(port, ports.ControlInputPort):
# buf = self.__buffers[port_name]
# buf[0] = port.frame[0]
# elif isinstance(port, ports.EventInputPort):
# buf, forge = self.__buffers[port_name]
# forge.set_buffer(buf, 4096)
# with forge.sequence():
# for event in port.events:
# sample_pos = event.sample_pos - ctxt.sample_pos
# assert 0 <= sample_pos < ctxt.duration
# if isinstance(event, events.NoteOnEvent):
# forge.write_midi_event(
# sample_pos,
# bytes([0x90, event.note.midi_note, event.volume]), 3)
# elif isinstance(event, events.NoteOffEvent):
# forge.write_midi_event(
# sample_pos,
# bytes([0x80, event.note.midi_note, 0]), 3)
# else:
# raise NotImplementedError(
# "Event class %s not supported" % type(event).__name__)
# else:
# raise ValueError(port)
for parameter in self.description.parameters:
if parameter.param_type == node_db.ParameterType.Float:
@ -147,12 +149,12 @@ class LV2(node.Node):
self.__instance.run(ctxt.duration)
for port_name, port in self.outputs.items():
buf = self.__buffers[port_name]
if isinstance(port, ports.AudioOutputPort):
port.frame.resize(ctxt.duration)
port.frame.samples[0] = buf[0:ctxt.duration]
elif isinstance(port, ports.ControlOutputPort):
port.frame.fill(buf[0])
else:
raise ValueError(port)
# for port_name, port in self.outputs.items():
# buf = self.__buffers[port_name]
# if isinstance(port, ports.AudioOutputPort):
# port.frame.resize(ctxt.duration)
# port.frame.samples[0] = buf[0:ctxt.duration]
# elif isinstance(port, ports.ControlOutputPort):
# port.frame.fill(buf[0])
# else:
# raise ValueError(port)

@ -7,11 +7,12 @@ from noisicaa import node_db
from .. import ports
from .. import node
from .. import audio_format
from .. import ast
logger = logging.getLogger(__name__)
class PassThru(node.Node):
class PassThru(node.CustomNode):
class_name = 'passthru'
def __init__(self, event_loop, name='passthru', id=None):
@ -19,12 +20,10 @@ class PassThru(node.Node):
ports=[
node_db.AudioPortDescription(
name='in',
direction=node_db.PortDirection.Input,
channels='stereo'),
direction=node_db.PortDirection.Input),
node_db.AudioPortDescription(
name='out',
direction=node_db.PortDirection.Output,
channels='stereo'),
direction=node_db.PortDirection.Output),
])
super().__init__(event_loop, description, name, id)

@ -1,37 +1,188 @@
#!/usr/bin/python3
import enum
import logging
import math
import os
import random
import struct
import sys
import threading
import time
import toposort
from noisicaa import core
from noisicaa import rwlock
from . import data
from . import resample
from . import node
logger = logging.getLogger(__name__)
class OpCode(object):
def __init__(self, op, **args):
self.op = op
self.args = args
def __repr__(self):
return type(self).__name__
return '%s(%s)' % (
self.op,
', '.join(
'%s=%r' % (k, v) for k, v in sorted(self.args.items())))
class BufferType(enum.Enum):
FLOATS = 1
class BufferRef(object):
def __init__(self, id, offset, length, type):
self.id = id
self.offset = offset
self.length = length
self.type = type
def __repr__(self):
return '%s (%s): %d@%d' % (
self.id, self.type.name, self.length, self.offset)
class FloatBufferRef(BufferRef):
def __init__(self, id, offset, count):
super().__init__(id, offset, 4 * count, BufferType.FLOATS)
class PipelineVMSpec(object):
def __init__(self):
self.opcodes = []
self.buffer_size = 0
self.buffers = []
self.nodes = []
class PipelineVM(object):
class PipelineGraph(object):
def __init__(self):
self.__nodes = {}
@property
def nodes(self):
return set(self.__nodes.values())
def find_node(self, node_id):
return self.__nodes[node_id]
def add_node(self, node):
self.__nodes[node.id] = node
def remove_node(self, node):
del self.__nodes[node.id]
def compile_spec(graph, frame_size):
spec = PipelineVMSpec()
sorted_nodes = toposort.toposort_flatten(
{n: set(n.parent_nodes) for n in graph.nodes},
sort=False)
buffer_offset = 0
buffer_map = {}
for n in sorted_nodes:
node_idx = len(spec.nodes)
port_map = []
for port_name, port in n.outputs.items():
buffer_id = '%s:%s' % (n.id, port_name)
port_map.append((port_name, buffer_id))
buffer_ref = FloatBufferRef(
buffer_id, buffer_offset, frame_size)
buffer_map[buffer_id] = buffer_ref
spec.buffers.append(buffer_ref)
buffer_offset += buffer_ref.length
for port_name, port in n.inputs.items():
buffer_id = '%s:%s' % (n.id, port_name)
port_map.append((port_name, buffer_id))
buffer_ref = FloatBufferRef(
buffer_id, buffer_offset, frame_size)
buffer_map[buffer_id] = buffer_ref
spec.buffers.append(buffer_ref)
buffer_offset += buffer_ref.length
spec.nodes.append([n.id, port_map])
for port_name, port in n.inputs.items():
buffer_id = '%s:%s' % (n.id, port_name)
buffer_ref = buffer_map[buffer_id]
first = True
for upstream_port in port.inputs:
upstream_buffer_id = '%s:%s' % (
upstream_port.owner.id, upstream_port.name)
upstream_buffer_ref = buffer_map[upstream_buffer_id]
assert buffer_ref.length == upstream_buffer_ref.length
if first:
spec.opcodes.append(OpCode(
'COPY_BUFFER',
src_offset=upstream_buffer_ref.offset,
dest_offset=buffer_ref.offset,
length=buffer_ref.length))
first = False
else:
spec.opcodes.append(OpCode(
'MIX',
src_offset=upstream_buffer_ref.offset,
dest_offset=buffer_ref.offset,
num_samples=frame_size,
factor=1.0))
if first:
spec.opcodes.append(OpCode(
'CLEAR_BUFFER',
offset=buffer_ref.offset,
length=buffer_ref.length))
if isinstance(n, node.CustomNode):
spec.opcodes.append(OpCode('CALL', node_idx=node_idx))
elif isinstance(n, node.BuiltinNode):
spec.opcodes.extend(n.opcodes())
else:
raise TypeError(type(n))
return spec
class PipelineVM(object):
def __init__(
self, *,
sample_rate=44100, frame_size=128, shm=None):
self.listeners = core.CallbackRegistry()
self.__sample_rate = sample_rate
self.__frame_size = frame_size
self.__shm = shm
self.__vm_thread = None
self.__vm_started = None
self.__vm_exit = None
self.__vm_lock = None
self.__vm_lock = rwlock.RWLock()
self.__backend = None
self.__spec = None
self.__opcode_states = None
self.__buffer = None
self.__buffer_map = None
self.__graph = PipelineGraph()
def reader_lock(self):
return self.__vm_lock.reader_lock
def writer_lock(self):
return self.__vm_lock.writer_lock
def setup(self):
self.__vm_lock = threading.Lock()
self.__vm_started = threading.Event()
self.__vm_exit = threading.Event()
self.__vm_thread = threading.Thread(target=self.vm_main)
@ -40,7 +191,12 @@ class PipelineVM(object):
logger.info("VM up and running.")
def cleanup(self):
if self.__vm_thread is not None:
if self.__backend is not None:
logger.info("Stopping backend...")
self.__backend.stop()
logger.info("Backend stopped...")
if self.__vm_thread is not None: # pragma: no branch
logger.info("Shutting down VM thread...")
self.__vm_exit.set()
self.__vm_thread.join()
@ -49,24 +205,119 @@ class PipelineVM(object):
self.__vm_started = None
self.__vm_exit = None
self.__vm_lock = None
self.__spec = None
self.cleanup_backend()
self.cleanup_spec()
def setup_spec(self, spec):
buffer_size = sum(buf.length for buf in spec.buffers)
self.allocate_buffer(buffer_size)
self.__buffer_map = {buf.id: buf for buf in spec.buffers}
self.__opcode_states = [{} for _ in spec.opcodes]
for node_id, buffer_spec in spec.nodes:
node = self.__graph.find_node(node_id)
for port_name, buffer_id in buffer_spec:
buffer_ref = self.__buffer_map[buffer_id]
node.connect_port(
port_name, self.__buffer, buffer_ref.offset)
self.__spec = spec
def cleanup_spec(self):
self.__buffer = None
self.__buffer_map = None
self.__spec = None
def set_spec(self, spec):
with self.__vm_lock:
if self.__spec is not None:
self.__buffer = None
self.__spec = None
logger.info("spec=%s", spec)
with self.writer_lock():
self.cleanup_spec()
if spec is not None:
self.__buffer = bytearray(spec.buffer_size)
self.__spec = spec
self.setup_spec(spec)
def build_spec(self):
spec = PipelineVMSpec()
return spec
def allocate_buffer(self, size):
self.__buffer = bytearray(size)
def get_buffer_bytes(self, buf_id):
ref = self.__buffer_map[buf_id]
return bytes(self.__buffer[ref.offset:ref.offset+ref.length])
def set_buffer_bytes(self, buf_id, data):
ref = self.__buffer_map[buf_id]
assert len(data) == ref.length
self.__buffer[ref.offset:ref.offset+ref.length] = data
def cleanup_backend(self):
if self.__backend is not None:
logger.info(
"Clean up backend %s", type(self.__backend).__name__)
self.__backend.cleanup()
self.__backend = None
def setup_backend(self, backend):
logger.info(
"Set up backend %s", type(backend).__name__)
backend.setup(self.__sample_rate)
self.__backend = backend
def set_backend(self, backend):
logger.info("backend=%s", type(backend).__name__)
with self.writer_lock():
self.cleanup_backend()
if backend is not None:
self.setup_backend(backend)
@property
def nodes(self):
return self.__graph.nodes
def find_node(self, node_id):
return self.__graph.find_node(node_id)
def add_node(self, node):
if node.pipeline is not None:
raise Error("Node has already been added to a pipeline")
node.pipeline = self
self.__graph.add_node(node)
def remove_node(self, node):
if node.pipeline is not self:
raise Error("Node has not been added to this pipeline")
node.pipeline = None
self.__graph.remove_node(node)
async def setup_node(self, node):
# TODO: reanimate crash handling
# if node.id == self._crasher_id:
# logger.warning(
# "Node %s (%s) has been deactivated, because it crashed the pipeline.",
# node.id, type(node).__name__)
# self.listeners.call('node_state', node.id, broken=True)
# node.broken = True
# return
# if self._shm_data is not None:
# marker = node.id.encode('ascii') + b'\0'
# self._shm_data[512:512+len(marker)] = marker
await node.setup()
# if self._shm_data is not None:
# self._shm_data[512] = 0
def vm_main(self):
try:
logger.info("Starting VM...")
ctxt = data.FrameContext()
ctxt.perf = core.PerfStats()
ctxt.sample_pos = 0
ctxt.duration = self.__frame_size
self.__vm_started.set()
while True:
@ -74,14 +325,42 @@ class PipelineVM(object):
logger.info("Exiting VM mainloop.")
break
with self.__vm_lock:
spec = self.__spec
if spec is not None:
self.run_vm(spec)
# TODO: remove traces of in/out_frame
ctxt.in_frame = None
ctxt.out_frame = None
backend = self.__backend
if backend is None:
time.sleep(0.1)
continue
self.listeners.call(
'perf_data', ctxt.perf.get_spans())
ctxt.perf = core.PerfStats()
with ctxt.perf.track('backend_begin_frame'):
backend.begin_frame(ctxt)
time.sleep(0.1)
try:
if backend.stopped:
break
except: # pylint: disable=bare-except
with self.reader_lock():
spec = self.__spec
if spec is not None:
self.run_vm(spec, ctxt)
else:
time.sleep(0.05)
finally:
with ctxt.perf.track('backend_end_frame'):
backend.end_frame(ctxt)
ctxt.sample_pos += ctxt.duration
ctxt.duration = self.__frame_size
except: # pragma: no coverage # pylint: disable=bare-except
sys.stdout.flush()
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
@ -90,6 +369,89 @@ class PipelineVM(object):
finally:
logger.info("VM finished.")
def run_vm(self, spec):
for opcode in spec.opcodes:
def run_vm(self, spec, ctxt):
for opcode, state in zip(spec.opcodes, self.__opcode_states):
logger.info("Executing opcode %s", opcode)
opmethod = getattr(self, 'op_' + opcode.op)
opmethod(ctxt, state, **opcode.args)
def op_COPY_BUFFER(
self, ctxt, state, *, src_offset, dest_offset, length):
assert 0 <= src_offset <= len(self.__buffer) - length
assert 0 <= dest_offset <= len(self.__buffer) - length
self.__buffer[dest_offset:dest_offset+length] = self.__buffer[src_offset:src_offset+length]
def op_CLEAR_BUFFER(
self, ctxt, state, *, offset, length):
assert 0 <= offset <= len(self.__buffer) - length
for i in range(offset, offset + length):
self.__buffer[i] = 0
def op_SET_FLOAT(self, ctxt, state, *, offset, value):
assert 0 <= offset <= len(self.__buffer) - 4
struct.pack_into('=f', self.__buffer, offset, value)
def op_OUTPUT_STEREO(
self, ctxt, state, *, offset_l, offset_r, num_samples):
assert 0 <= offset_l <= len(self.__buffer) - 4 * num_samples
assert 0 <= offset_r <= len(self.__buffer) - 4 * num_samples
assert num_samples > 0
self.__backend.output(
resample.AV_CH_LAYOUT_STEREO,
num_samples,
[bytes(self.__buffer[offset_l:offset_l+4*num_samples]),
bytes(self.__buffer[offset_r:offset_r+4*num_samples])])
def op_NOISE(self, ctxt, state, *, offset, num_samples):
assert 0 <= offset <= len(self.__buffer) - 4 * num_samples
assert num_samples > 0
for i in range(offset, offset + 4 * num_samples, 4):
self.__buffer[i:i+4] = struct.pack(
'=f', 2 * random.random() - 1.0)
def op_SINE(self, ctxt, state, *, offset, num_samples, freq):
assert 0 <= offset <= len(self.__buffer) - 4 * num_samples
assert num_samples > 0
p = state.get('p', 0.0)
for i in range(offset, offset + 4 * num_samples, 4):
self.__buffer[i:i+4] = struct.pack(
'=f', math.sin(p))
p += 2 * math.pi * freq / self.__sample_rate
if p > 2 * math.pi:
p -= 2 * math.pi
state['p'] = p
def op_MUL(self, ctxt, state, *, offset, num_samples, factor):
assert 0 <= offset <= len(self.__buffer) - 4 * num_samples
assert num_samples > 0
for i in range(offset, offset + 4 * num_samples, 4):
self.__buffer[i:i+4] = struct.pack(
'=f', factor * struct.unpack(
'=f', self.__buffer[i:i+4])[0])
def op_MIX(
self, ctxt, state, *,
src_offset, dest_offset, num_samples, factor):
assert 0 <= src_offset <= len(self.__buffer) - 4 * num_samples
assert 0 <= dest_offset <= len(self.__buffer) - 4 * num_samples
assert num_samples > 0
for i in range(0, 4 * num_samples, 4):
src_val = struct.unpack(
'=f', self.__buffer[src_offset+i:src_offset+i+4])[0]
dest_val = struct.unpack(
'=f', self.__buffer[dest_offset+i:dest_offset+i+4])[0]
self.__buffer[dest_offset+i:dest_offset+i+4] = struct.pack(
'=f', dest_val + factor * src_val)
def op_CALL(self, ctxt, state, *, node_idx):
node_id = self.__spec.nodes[node_idx][0]
node = self.__graph.find_node(node_id)
node.run(ctxt)

@ -1,28 +1,348 @@
#!/usr/bin/python3
import logging
import struct
import threading
import time
import unittest
import asynctest
from noisicaa import node_db
from . import backend
from . import data
from . import pipeline_vm
from . import resample
from . import nodes
logger = logging.getLogger(__name__)
class PipelineVMTest(asynctest.TestCase):
class TestBackend(backend.Backend):
def __init__(self, step_mode=False):
super().__init__()
self.written_frames = []
self.step_mode = step_mode
self.start_step = threading.Event()
self.step_done = threading.Event()
def begin_frame(self, ctxt):
logger.info("Backend.begin_frame()")
if self.step_mode:
self.start_step.wait()
self.start_step.clear()
def end_frame(self, ctxt):
logger.info("Backend.end_frame()")
if self.step_mode:
self.step_done.set()
def output(self, layout, num_samples, data):
logger.info("Backend received frame.")
self.written_frames.append([layout, num_samples, data])
def stop(self):
if self.step_mode:
self.start_step.set()
super().stop()
def next_step(self):
assert self.step_mode
self.start_step.set()
self.step_done.wait()
self.step_done.clear()
class CompileSpecTest(asynctest.TestCase):
async def test_foo(self):
graph = pipeline_vm.PipelineGraph()
node1 = nodes.PassThru(self.loop)
graph.add_node(node1)
node2 = nodes.PassThru(self.loop)
graph.add_node(node2)
node2.inputs['in'].connect(node1.outputs['out'])
node3 = nodes.Sink(self.loop)
graph.add_node(node3)
node3.inputs['audio_left'].connect(node1.outputs['out'])