Gracefully handle crashes in the audio process.

looper
Ben Niemann 7 years ago
parent ee3066f8e0
commit 5624c78cd0

@ -1,16 +1,24 @@
# -*- org-tags-column: -98 -*-
* Gracefully handle pipeline crashes :FR:
Blacklist crashing nodes
- before calling setup() on a node, write some persistent marker identifying the node.
- when restarting from a crash and marker is found, deactivate node and mark it as broken.
- user can manually reactivate node
- dito for run()
- use shared memory owned by player to store these markers?
- must be fast and lock-free
- directly mark node as broken, when it throws an exception during setup() or run()
* Graceful AudioStream shutdown :FR:
send close message to backend
* Plugin support :FR:
** LV2
* Fix removing measures :BUG:
- remove measure on SheetPropertyTrack causes exception
- no way to remove trailing measures from sheet
* Gracefully handle pipeline crashes
- exceptions in pipeline loop thread terminate pipeline process
- notify player
- not via exception, because the process my die hard with a SIGSEGV, etc.
- notify player_client
- UI dialog: quit app, restart player, undo last command and restart player
* More flexible instrument handling :FR:
Have a list of instrument types
@ -26,8 +34,9 @@ Have a list of instrument types
Set the base tuning of the instrument.
- also look at sample rate (ftsr function)
* PipelineGraphMonitor should use ports from node description :BUG:
- exception when there's a port that's not in the legacy description.
* reanimate PipelineGraphMonitor :BUG:
- doesn't know how to handle changing address of audioproc process
* Node presets :FR:
- track current directory for import/export file dialogs
- which default directory?

@ -21,6 +21,7 @@ class StreamError(Exception):
class AudioStreamBase(object):
def __init__(self, address):
assert address is not None
self._address = address
self._pipe_in = None
@ -30,6 +31,10 @@ class AudioStreamBase(object):
self._closed = False
self._buffer = bytearray()
@property
def address(self):
return self._address
def setup(self):
self._poller = select.poll()
self._poller.register(self._pipe_in, select.POLLIN)
@ -90,6 +95,9 @@ class AudioStreamBase(object):
frame.perf_data = []
line = self._get_line()
if line == b'#CLOSE':
raise StreamClosed
assert line.startswith(b'#FR=')
frame.sample_pos = int(line[4:])
while True:
@ -231,10 +239,16 @@ class AudioStreamClient(AudioStreamBase):
def cleanup(self):
super().cleanup()
if self._pipe_in is not None:
os.close(self._pipe_in)
self._pipe_in = None
if self._pipe_out is not None:
request = bytearray()
request.extend(b'#CLOSE\n')
while request:
written = os.write(self._pipe_out, request)
del request[:written]
os.close(self._pipe_out)
self._pipe_out = None
if self._pipe_in is not None:
os.close(self._pipe_in)
self._pipe_in = None

@ -22,6 +22,11 @@ class AudioProcClientMixin(object):
'PIPELINE_STATUS', self.handle_pipeline_status,
log_level=-1)
async def cleanup(self):
self.server.remove_command_handler('PIPELINE_MUTATION')
self.server.remove_command_handler('PIPELINE_STATUS')
await super().cleanup()
async def connect(self, address, flags=None):
assert self._stub is None
self._stub = ipc.Stub(self.event_loop, address)

@ -73,7 +73,6 @@ class Session(object):
class AudioProcProcessMixin(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.backend = None
self.pipeline = None
async def setup(self):
@ -123,13 +122,12 @@ class AudioProcProcessMixin(object):
self.node_db.add(nodes.SplitChannels)
self.node_db.add(nodes.JoinChannels)
self.node_db.add(nodes.Ladspa)
self.node_db.add(nodes.PipelineCrasher)
self.pipeline = pipeline.Pipeline()
self.pipeline.utilization_callback = self.utilization_callback
self.pipeline.listeners.add('perf_data', self.perf_data_callback)
self.backend = None
self.audiosink = backend.AudioSinkNode(self.event_loop)
await self.audiosink.setup()
self.pipeline.add_node(self.audiosink)
@ -146,7 +144,6 @@ class AudioProcProcessMixin(object):
if self.pipeline is not None:
self.pipeline.stop()
self.pipeline = None
self.backend = None
await super().cleanup()

@ -266,6 +266,7 @@ class IPCBackend(Backend):
self.add_event(queue, event)
except audio_stream.StreamClosed:
logger.warning("Stopping IPC backend.")
self.stop()
def write(self, ctxt):

@ -8,3 +8,4 @@ from .wavfile import WavFileSource
from .sample_player import SamplePlayer
from .channels import SplitChannels, JoinChannels
from .ladspa import Ladspa
from .pipeline_crasher import PipelineCrasher

@ -54,8 +54,8 @@ class IPCNode(node.Node):
request.events = self.pipeline.backend.get_events_for_prefix(
self._event_queue_name)
self._stream.send_frame(request)
response = self._stream.receive_frame()
assert response.sample_pos == ctxt.sample_pos, (
response.sample_pos, ctxt.sample_pos)
assert response.duration == ctxt.duration, (
@ -63,8 +63,12 @@ class IPCNode(node.Node):
ctxt.perf.add_spans(response.perf_data)
output_port = self.outputs['out']
output_port.frame.resize(0)
output_port.frame.append_samples(
response.samples, response.num_samples)
assert len(output_port.frame) <= ctxt.duration
output_port.frame.resize(ctxt.duration)
if response.num_samples:
output_port.frame.resize(0)
output_port.frame.append_samples(
response.samples, response.num_samples)
assert len(output_port.frame) <= ctxt.duration
output_port.frame.resize(ctxt.duration)
else:
output_port.frame.resize(ctxt.duration)
output_port.frame.clear()

@ -0,0 +1,22 @@
#!/usr/bin/python3
import os
import signal
from .. import node
class PipelineCrasher(node.Node):
class_name = 'pipeline_crasher'
def run(self, ctxt):
trigger = self.get_param('trigger')
if trigger > 0.5:
raise RuntimeError("Kaboom!")
if trigger < -0.5:
os.kill(os.getpid(), signal.SIGSEGV)
input_port = self.inputs['in']
output_port = self.outputs['out']
output_port.frame.resize(ctxt.duration)
output_port.frame.copy_from(input_port.frame)

@ -2,6 +2,7 @@
import logging
import threading
import os
import pprint
import sys
import time
@ -74,6 +75,7 @@ class Pipeline(object):
if self._backend is not None:
logger.info("Stopping backend...")
self._backend.stop()
logger.info("Backend stopped...")
if self._thread is not None:
logger.info("Stopping pipeline thread...")
@ -82,6 +84,10 @@ class Pipeline(object):
logger.info("Pipeline thread stopped.")
self._thread = None
if self._backend is not None:
self._backend.cleanup()
self._backend = None
self._running = False
def wait(self):
@ -164,6 +170,10 @@ class Pipeline(object):
except: # pylint: disable=bare-except
sys.excepthook(*sys.exc_info())
os._exit(1)
finally:
logger.info("Mainloop finished.")
def process_frame(self, ctxt):
with self.reader_lock():

@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
class RemoteException(Exception): pass
class Error(Exception): pass
class InvalidResponseError(Error): pass
class ConnectionClosed(Error): pass
class ConnState(enum.Enum):
@ -125,6 +126,10 @@ class Server(object):
if log_level is not None:
self._command_log_levels[cmd] = log_level
def remove_command_handler(self, cmd):
if cmd in self._command_handlers:
del self._command_handlers[cmd]
def new_connection_id(self):
self._next_connection_id += 1
return self._next_connection_id
@ -201,6 +206,8 @@ class ClientProtocol(asyncio.Protocol):
def connection_lost(self, exc):
self.closed_event.set()
logger.info("Client connection lost.")
self.response_queue.put_nowait(self.stub.CLOSE_SENTINEL)
def data_received(self, data):
self.buf.extend(data)
@ -238,6 +245,8 @@ class Stub(object):
pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
deserialize = pickle.loads
CLOSE_SENTINEL = object()
def __init__(self, event_loop, server_address):
self._event_loop = event_loop
self._server_address = server_address
@ -283,13 +292,18 @@ class Stub(object):
exc, pprint.pformat(args), pprint.pformat(kwargs))
) from None
if self._transport.is_closing():
raise ConnectionClosed()
self._transport.write(b'CALL %s %d\n' % (cmd, len(payload)))
if payload:
self._transport.write(payload)
response = await self._protocol.response_queue.get()
if response == b'OK':
if response is self.CLOSE_SENTINEL:
raise ConnectionClosed
elif response == b'OK':
return None
elif response.startswith(b'OK:'):
return self.deserialize(response[3:])

@ -1,11 +1,13 @@
#!/usr/bin/python3
import enum
import functools
import asyncio
import logging
import os
import os.path
import random
import sys
import tempfile
import threading
import time
@ -25,6 +27,154 @@ from . import model
logger = logging.getLogger(__name__)
class BackendState(enum.Enum):
Stopped = 'stopped'
Starting = 'starting'
Running = 'running'
Crashed = 'crashed'
Stopping = 'stopping'
class BackendManager(object):
def __init__(self, event_loop):
self._event_loop = event_loop
self._state = BackendState.Stopped
self._state_changed = asyncio.Event(loop=self._event_loop)
self._listeners = core.CallbackRegistry()
@property
def is_running(self):
return self._state == BackendState.Running
def add_state_listener(self, callback):
return self._listeners.add('state-changed', callback)
async def wait_until_running(self):
while True:
if self._state == BackendState.Running:
return
if self._state == BackendState.Stopped:
raise RuntimeError
await self._state_changed.wait()
self._state_changed.clear()
async def wait_until_stopped(self):
while True:
if self._state == BackendState.Stopped:
return
await self._state_changed.wait()
self._state_changed.clear()
def start(self):
if self._state in (BackendState.Running, BackendState.Starting):
pass
elif self._state == BackendState.Stopped:
self._set_state(BackendState.Starting)
task = self._event_loop.create_task(self.start_backend())
task.add_done_callback(self._start_backend_finished)
else:
raise AssertionError("Unexpected state %s" % self._state.value)
def stop(self):
if self._state in (BackendState.Stopped, BackendState.Stopping):
pass
elif self._state == BackendState.Crashed:
task = self._event_loop.create_task(self.cleanup())
task.add_done_callback(self._cleanup_finished)
elif self._state == BackendState.Running:
task = self._event_loop.create_task(self.stop_backend())
task.add_done_callback(self._stop_backend_finished)
else:
raise AssertionError("Unexpected state %s" % self._state.value)
def backend_crashed(self):
if self._state in (BackendState.Crashed, BackendState.Stopping, BackendState.Stopped):
pass
elif self._state == BackendState.Running:
self._set_state(BackendState.Crashed)
task = self._event_loop.create_task(self.cleanup())
task.add_done_callback(self._cleanup_finished)
else:
raise AssertionError("Unexpected state %s" % self._state.value)
def _set_state(self, new_state):
assert new_state != self._state
logger.info("State %s -> %s", self._state.value, new_state.value)
self._state = new_state
self._state_changed.set()
self._listeners.call('state-changed', new_state)
def _start_backend_finished(self, task):
exc = task.exception()
if exc is not None:
raise exc
# self._set_state(BackendState.Crashed)
# self._event_loop.create_task(self.cleanup())
self._set_state(BackendState.Running)
self._event_loop.create_task(self.backend_started())
def _stop_backend_finished(self, task):
exc = task.exception()
if exc is not None:
raise exc
task = self._event_loop.create_task(self.cleanup())
task.add_done_callback(self._cleanup_finished)
def _cleanup_finished(self, task):
exc = task.exception()
if exc is not None:
raise exc
self._set_state(BackendState.Stopped)
self._event_loop.create_task(self.backend_stopped())
async def start_backend(self):
raise NotImplementedError
async def stop_backend(self):
raise NotImplementedError
async def cleanup(self):
raise NotImplementedError
async def backend_started(self):
pass
async def backend_stopped(self):
pass
class AudioProcBackend(BackendManager):
def __init__(self, player, event_loop):
super().__init__(event_loop)
self._player = player
async def start_backend(self):
await self._player.start_audioproc()
async def stop_backend(self):
await self._player.stop_audioproc()
async def cleanup(self):
await self._player.stop_audioproc()
async def backend_started(self):
await self._player.audioproc_started()
async def backend_stopped(self):
await self._player.audioproc_stopped()
class AudioProcClientImpl(object):
def __init__(self, event_loop, server):
super().__init__()
@ -43,7 +193,7 @@ class AudioProcClient(
class AudioStreamProxy(object):
def __init__(self, player, address, socket_dir=None):
def __init__(self, player, socket_dir=None):
self._player = player
if socket_dir is None:
@ -52,28 +202,44 @@ class AudioStreamProxy(object):
self.address = os.path.join(
socket_dir, 'player.%s.pipe' % uuid.uuid4().hex)
self._lock = threading.Lock()
self._server = audioproc.AudioStreamServer(self.address)
self._client = audioproc.AudioStreamClient(address)
self._client = None
self._stopped = threading.Event()
self._thread = threading.Thread(target=self.main)
def setup(self):
self._server.setup()
self._client.setup()
self._thread.start()
def cleanup(self):
self._server.close()
self._stopped.set()
self._thread.join()
self._client.cleanup()
self._server.cleanup()
def set_client(self, client):
if client is not None:
logger.info("Proxy will talk to %s...", client.address)
else:
logger.info("Disabling proxy backend.")
with self._lock:
self._client = client
def main(self):
state = 'stopped'
sample_pos_offset = None
while True:
try:
request = self._server.receive_frame()
logger.info("Player proxy started.")
try:
while not self._stopped.is_set():
try:
request = self._server.receive_frame()
except audioproc.StreamClosed:
logger.warning("Stream to PlayerClient closed.")
raise
perf = core.PerfStats()
@ -89,10 +255,27 @@ class AudioStreamProxy(object):
self._player.playback_sample_pos += request.duration
with perf.track('send_frame'):
self._client.send_frame(request)
with perf.track('receive_frame'):
response = self._client.receive_frame()
with self._lock:
if self._client is not None:
try:
with perf.track('send_frame'):
self._client.send_frame(request)
with perf.track('receive_frame'):
response = self._client.receive_frame()
except audioproc.StreamClosed:
logger.warning("Stream to pipeline closed.")
self._player.event_loop.call_soon_threadsafe(
self._player.audioproc_backend.backend_crashed)
self._client = None
if self._client is None:
response = audioproc.FrameData()
response.sample_pos = request.sample_pos
response.duration = request.duration
response.events = []
response.entities = {}
response.perf_data = []
perf.add_spans(response.perf_data)
response.perf_data = perf.get_spans()
if state == 'playing':
@ -100,10 +283,21 @@ class AudioStreamProxy(object):
playback_pos=(
request.sample_pos - sample_pos_offset,
request.duration))
self._server.send_frame(response)
except audioproc.StreamClosed:
break
try:
self._server.send_frame(response)
except audioproc.StreamClosed:
logger.warning("Stream to PlayerClient closed.")
raise
except audioproc.StreamClosed:
pass
except: # pylint: disable=bare-except
sys.excepthook(*sys.exc_info())
finally:
logger.info("Player proxy terminated.")
class Player(object):
@ -118,11 +312,13 @@ class Player(object):
self.callback_stub = None
self.setup_complete = False
self.audioproc_backend = None
self.audioproc_backend_state_listener = None
self.audioproc_backend_last_crash_time = None
self.audioproc_address = None
self.audioproc_client = None
self.audiostream_address = None
self.audiostream_client = None
self.mutation_listener = None
self.pending_pipeline_mutations = None
@ -140,39 +336,37 @@ class Player(object):
return self.proxy.address
async def setup(self):
logger.info("Setting up player instance %s..", self.id)
logger.info("Setting up player server...")
await self.server.setup()
logger.info("Player server address: %s", self.server.address)
logger.info("Connecting to client callback server %s..", self.callback_address)
self.callback_stub = ipc.Stub(
self.event_loop, self.callback_address)
await self.callback_stub.connect()
self.audioproc_address = await self.manager.call(
'CREATE_AUDIOPROC_PROCESS', 'player')
self.audioproc_client = AudioProcClient(
self.event_loop, self.server)
await self.audioproc_client.setup()
await self.audioproc_client.connect(self.audioproc_address)
self.audiostream_address = await self.audioproc_client.set_backend('ipc')
self.proxy = AudioStreamProxy(self, self.audiostream_address)
logger.info("Starting audio stream proxy...")
self.proxy = AudioStreamProxy(self)
self.proxy.setup()
self.pending_pipeline_mutations = []
self.mutation_listener = self.sheet.listeners.add(
'pipeline_mutations', self.handle_pipeline_mutation)
self.sheet.add_to_pipeline()
pipeline_mutations = self.pending_pipeline_mutations[:]
self.pending_pipeline_mutations = None
for mutation in pipeline_mutations:
await self.publish_pipeline_mutation(mutation)
await self.audioproc_client.dump()
logger.info("Starting audio process...")
self.audioproc_backend = AudioProcBackend(self, self.event_loop)
self.audioproc_backend_state_listener = self.audioproc_backend.add_state_listener(
self.audioproc_state_changed)
self.audioproc_backend.start()
self.add_track(self.sheet.master_group)
logger.info("Player instance %s setup complete.", self.id)
async def cleanup(self):
logger.info("Cleaning up player instance %s..", self.id)
for listener in self.group_listeners.values():
listener.remove()
self.group_listeners.clear()
@ -183,22 +377,108 @@ class Player(object):
self.mutation_listener.remove()
self.mutation_listener = None
if self.proxy is not None:
self.proxy.cleanup()
self.proxy = None
if self.audioproc_backend_state_listener is not None:
self.audioproc_backend_state_listener.remove()
self.audioproc_backend_state_listener = None
if self.audioproc_backend is not None:
self.audioproc_backend = None
logger.info("Stopping audio process...")
await self.stop_audioproc()
if self.callback_stub is not None:
logger.info("Closing connection to client callback server...")
await self.callback_stub.close()
self.callback_stub = None
if self.proxy is not None:
logger.info("Stopping audio stream proxy...")
self.proxy.cleanup()
self.proxy = None
logger.info("Cleaning up player server...")
await self.server.cleanup()
logger.info("Player instance %s cleanup complete.", self.id)
def audioproc_state_changed(self, state):
self.publish_status_async(pipeline_state=state.value)
async def start_audioproc(self):
logger.info("Starting audioproc backend...")
logger.info("Creating audioproc process...")
self.audioproc_address = await self.manager.call(
'CREATE_AUDIOPROC_PROCESS', 'player')
logger.info("Creating audioproc client...")
self.audioproc_client = AudioProcClient(
self.event_loop, self.server)
await self.audioproc_client.setup()
logger.info("Connecting audioproc client...")
await self.audioproc_client.connect(self.audioproc_address)
logger.info("Setting backend...")
self.audiostream_address = await self.audioproc_client.set_backend('ipc')
logger.info("Creating audiostream client...")
self.audiostream_client = audioproc.AudioStreamClient(self.audiostream_address)
self.audiostream_client.setup()
logger.info("Audioproc backend started.")
async def audioproc_started(self):
self.pending_pipeline_mutations = []
self.sheet.add_to_pipeline()
pipeline_mutations = self.pending_pipeline_mutations[:]
self.pending_pipeline_mutations = None
try:
for mutation in pipeline_mutations:
await self.publish_pipeline_mutation(mutation)
await self.audioproc_client.dump()
self.proxy.set_client(self.audiostream_client)
except ipc.ConnectionClosed:
self.audioproc_backend.backend_crashed()
async def stop_audioproc(self):
logger.info("Stopping audioproc backend...")
if self.audiostream_client is not None:
self.proxy.set_client(None)
self.audiostream_client.cleanup()
self.audiostream_client = None
self.audiostream_address = None
if self.audioproc_client is not None:
await self.audioproc_client.disconnect(shutdown=True)
logger.info("Disconnecting audioproc client...")
try:
await self.audioproc_client.disconnect(shutdown=True)
except ipc.ConnectionClosed:
logger.info("Connection already closed.")
await self.audioproc_client.cleanup()
self.audioproc_client = None
self.audioproc_address = None
self.audiostream_address = None
await self.server.cleanup()
logger.info("Audioproc backend stopped.")
async def audioproc_stopped(self):
now = time.time()
if (self.audioproc_backend_last_crash_time is None
or now - self.audioproc_backend_last_crash_time > 30):
self.audioproc_backend.start()
else:
self.publish_status_async(pipeline_disabled=True)
self.audioproc_backend_last_crash_time = now
def restart_pipeline(self):
self.audioproc_backend.start()
def publish_status_async(self, **kwargs):
callback_task = asyncio.run_coroutine_threadsafe(
@ -253,34 +533,38 @@ class Player(object):
if self.audioproc_client is None:
return
if isinstance(mutation, mutations.AddNode):
await self.audioproc_client.add_node(
mutation.node_type, id=mutation.node_id,
name=mutation.node_name, **mutation.args)
try:
if isinstance(mutation, mutations.AddNode):
await self.audioproc_client.add_node(
mutation.node_type, id=mutation.node_id,
name=mutation.node_name, **mutation.args)
elif isinstance(mutation, mutations.RemoveNode):
await self.audioproc_client.remove_node(mutation.node_id)
elif isinstance(mutation, mutations.RemoveNode):
await self.audioproc_client.remove_node(mutation.node_id)
elif isinstance(mutation, mutations.ConnectPorts):
await self.audioproc_client.connect_ports(
mutation.src_node, mutation.src_port,
mutation.dest_node, mutation.dest_port)
elif isinstance(mutation, mutations.ConnectPorts):
await self.audioproc_client.connect_ports(
mutation.src_node, mutation.src_port,
mutation.dest_node, mutation.dest_port)
elif isinstance(mutation, mutations.DisconnectPorts):
await self.audioproc_client.disconnect_ports(
mutation.src_node, mutation.src_port,
mutation.dest_node, mutation.dest_port)
elif isinstance(mutation, mutations.DisconnectPorts):
await self.audioproc_client.disconnect_ports(
mutation.src_node, mutation.src_port,
mutation.dest_node, mutation.dest_port)
elif isinstance(mutation, mutations.SetPortProperty):
await self.audioproc_client.set_port_property(
mutation.node, mutation.port, **mutation.kwargs)
elif isinstance(mutation, mutations.SetPortProperty):
await self.audioproc_client.set_port_property(
mutation.node, mutation.port, **mutation.kwargs)
elif isinstance(mutation, mutations.SetNodeParameter):
await self.audioproc_client.set_node_parameter(
mutation.node, **mutation.kwargs)
elif isinstance(mutation, mutations.SetNodeParameter):
await self.audioproc_client.set_node_parameter(
mutation.node, **mutation.kwargs)
else:
raise ValueError(type(mutation))
else:
raise ValueError(type(mutation))
except ipc.ConnectionClosed:
self.audioproc_backend.backend_crashed()
def _set_playback_state(self, new_state):
assert new_state in ('stopped', 'playing')

@ -0,0 +1,183 @@
#!/usr/bin/python3
import asyncio
import logging
import os.path
import tempfile
import threading
import time
import uuid
import unittest
from unittest import mock
import asynctest
from noisicaa import core
from noisicaa import audioproc
from noisicaa.core import ipc
from noisicaa.ui import model
from . import project
from . import sheet
from . import player
logger = logging.getLogger(__name__)
class MockAudioProcClient(object):
def __init__(self, event_loop, server):
self.audiostream_server = None
self.backend_thread = None
self.stop_backend = None
async def setup(self):
pass
async def cleanup(self):
if self.backend_thread is not None:
self.backend_thread.join()
self.backend_thread = None
if self.audiostream_server is not None:
self.audiostream_server.cleanup()
self.audiostream_server = None
async def connect(self, address):
logger.info("Connecting to audioproc client at %s...", address)
async def disconnect(self, shutdown=False):
logger.info("Disconnect audioproc client (shutdown=%s).", shutdown)
async def set_backend(self, backend):
logger.info("Set to audioproc backend to %s.", backend)
if backend == 'ipc':
address = os.path.join(
tempfile.gettempdir(), 'audioproc.%s.pipe' % uuid.uuid4().hex)
self.audiostream_server = audioproc.AudioStreamServer(address)
self.audiostream_server.setup()
self.stop_backend = threading.Event()
self.backend_thread = threading.Thread(target=self.backend_main)
self.backend_thread.start()
return address
return None
async def dump(self):
pass
def kill_backend(self):
self.stop_backend.set()
self.backend_thread.join()
self.backend_thread = None
self.audiostream_server.cleanup()
self.audiostream_server = None
def backend_main(self):
try:
while not self.stop_backend.is_set():
logger.debug("Waiting for request...")
request = self.audiostream_server.receive_frame()
logger.debug("Got request %s, sending response.", request.sample_pos)
self.audiostream_server.send_frame(request)
except audioproc.StreamClosed:
pass
class PlayerTest(asynctest.TestCase):
async def setUp(self):
self.project = project.BaseProject()
self.sheet = sheet.Sheet(name='Test', num_tracks=0)
self.project.sheets.append(self.sheet)
self.player_status_calls = asyncio.Queue()
self.callback_server = ipc.Server(self.loop, 'callback')
self.callback_server.add_command_handler(
'PLAYER_STATUS',
lambda player_id, kwargs: self.player_status_calls.put_nowait(kwargs))
await self.callback_server.setup()
self.audioproc_server = ipc.Server(self.loop, 'audioproc')
await self.audioproc_server.setup()
self.mock_manager = mock.Mock()
async def mock_call(cmd, *args):
assert cmd == 'CREATE_AUDIOPROC_PROCESS'
name, = args
assert name == 'player'
return self.audioproc_server.address
self.mock_manager.call.side_effect = mock_call
self.proxy_client_thread = None
logger.info("Testcase setup complete.")
async def tearDown(self):
logger.info("Testcase teardown starts...")
if self.proxy_client_thread is not None:
self.proxy_client_thread.join()
await self.audioproc_server.cleanup()
await self.callback_server.cleanup()
def start_proxy_client(self, p):
self.proxy_client_thread = threading.Thread(
target=self.proxy_client_main, args=(p.proxy_address,))
self.proxy_client_thread.start()
def proxy_client_main(self, address):
client = audioproc.AudioStreamClient(address)
try:
client.setup()
sample_pos = 0
while True:
request = audioproc.FrameData()
request.sample_pos = sample_pos
request.duration = 10
logger.debug("Sending frame %s...", sample_pos)
client.send_frame(request)
logger.debug("Waiting for response...")
response = client.receive_frame()
logger.debug("Got response %s.", response.sample_pos)
sample_pos += 1
except audioproc.StreamClosed:
pass
finally:
client.cleanup()
async def test_audio_stream_fails(self):
p = player.Player(self.sheet, self.callback_server.address, self.mock_manager, self.loop)
try:
with mock.patch('noisicaa.music.player.AudioProcClient', MockAudioProcClient):
await p.setup()
self.start_proxy_client(p)
logger.info("Wait until audioproc is ready...")
self.assertEqual(
await self.player_status_calls.get(),
{'pipeline_state': 'starting'})
self.assertEqual(
await self.player_status_calls.get(),
{'pipeline_state': 'running'})
logger.info("Backend closes its pipe...")
p.audioproc_client.kill_backend()
self.assertEqual(
await self.player_status_calls.get(),
{'pipeline_state': 'crashed'})
logger.info("Waiting until audioproc is down...")
self.assertEqual(
await self.player_status_calls.get(),
{'pipeline_state': 'stopped'})
# need some time to finish the IPC response of the last PLAYER_STATUS call
# TODO: server shutdown should lame duck and wait until all pending
# calls are finished.
await asyncio.sleep(0.1)
finally:
await p.cleanup()
if __name__ == '__main__':
unittest.main()

@ -263,6 +263,10 @@ class ProjectClientMixin(object):
return await self._stub.call(
'PLAYER_STOP', self._session_id, player_id)
async def restart_player_pipeline(self, player_id):
return await self._stub.call(
'RESTART_PLAYER_PIPELINE', self._session_id, player_id)
def add_player_status_listener(self, player_id, callback):
return self.listeners.add(
'player_status:%s' % player_id, callback)

@ -131,6 +131,8 @@ class ProjectProcessMixin(object):
'PLAYER_PAUSE', self.handle_player_pause)
self.server.add_command_handler(
'PLAYER_STOP', self.handle_player_stop)
self.server.add_command_handler(
'RESTART_PLAYER_PIPELINE', self.handle_restart_player_pipeline)
self.server.add_command_handler(
'DUMP', self.handle_dump)
@ -366,6 +368,11 @@ class ProjectProcessMixin(object):
p = session.players[player_id]
await p.playback_stop()
async def handle_restart_player_pipeline(self, session_id, player_id):
session = self.get_session(session_id)
p = session.players[player_id]
p.restart_pipeline()
async def handle_dump(self, session_id):
assert self.project is not None
session = self.get_session(session_id)

@ -33,6 +33,31 @@ class BuiltinScanner(scanner.Scanner):
])
)
yield (
'builtin://pipeline_crasher',
node_db.UserNodeDescription(
display_name='PipelineCrasher',
node_cls='pipeline_crasher',
ports=[
node_db.AudioPortDescription(
name='in',
direction=node_db.PortDirection.Input,
channels='stereo'),
node_db.AudioPortDescription(
name='out',
direction=node_db.PortDirection.Output,
channels='stereo'),
],
parameters=[
node_db.FloatParameterDescription(
name='trigger',
display_name='Trigger',
min=-1.0,
max=1.0,
default=0.0),
])
)
yield (
'builtin://split_channel',
node_db.UserNodeDescription(

@ -245,15 +245,15 @@ class EditorApp(BaseEditorApp):
logger.info("Creating PipelinePerfMonitor.")
self.pipeline_perf_monitor = pipeline_perf_monitor.PipelinePerfMonitor(self)
logger.info("Creating PipelineGraphMonitor.")
self.pipeline_graph_monitor = pipeline_graph_monitor.PipelineGraphMonitor(self)
# logger.info("Creating PipelineGraphMonitor.")
# self.pipeline_graph_monitor = pipeline_graph_monitor.PipelineGraphMonitor(self)
logger.info("Creating EditorWindow.")
self.win = EditorWindow(self)
await self.win.setup()
self.win.show()
self.pipeline_graph_monitor.addWindow(self.win)
# self.pipeline_graph_monitor.addWindow(self.win)
if self.paths:
logger.info("Starting with projects from cmdline.")

@ -335,9 +335,12 @@ class EditorWindow(ui_base.CommonMixin, QtWidgets.QMainWindow):
def createStatusBar(self):
self.statusbar = QtWidgets.QStatusBar()
self.player_status = LoadHistoryWidget(100, 30)
self.player_status.setToolTip("Load of the playback engine.")
self.statusbar.addPermanentWidget(self.player_status)
# self.pipeline_load = LoadHistoryWidget(100, 30)
# self.pipeline_load.setToolTip("Load of the playback engine.")
# self.statusbar.addPermanentWidget(self.pipeline_load)
self.pipeline_status = QtWidgets.QLabel()
self.statusbar.addPermanentWidget(self.pipeline_status)
self.setStatusBar(self.statusbar)

@ -498,7 +498,7 @@ class NodePropertyDialog(ui_base.ProjectMixin, QtWidgets.QDialog):
def onFloatParameterEdited(self, widget, parameter):
value, ok = widget.locale().toDouble(widget.text())
if ok:
if ok and value != self.__parameter_values[parameter.name]:
self.send_command_async(
self._node_item.node.id, 'SetPipelineGraphNodeParameter',
parameter_name=parameter.name,
@ -508,10 +508,12 @@ class NodePropertyDialog(ui_base.ProjectMixin, QtWidgets.QDialog):
widget.setPlainText(new_value)
def onTextParameterEdited(self, widget, parameter):
self.send_command_async(
self._node_item.node.id, 'SetPipelineGraphNodeParameter',
parameter_name=parameter.name,
str_value=widget.toPlainText())
value = widget.toPlainText()
if value != self.__parameter_values[parameter.name]:
self.send_command_async(
self._node_item.node.id, 'SetPipelineGraphNodeParameter',
parameter_name=parameter.name,
str_value=value)
def onPortMutedEdited(self, port, volume_widget, value):
volume_widget.setEnabled(not value)

@ -437,7 +437,8 @@ class SheetViewImpl(QtWidgets.QGraphicsView):
self.call_async(
self.project_client.player_stop(self._player_id))
def onPlayerStatus(self, playback_pos=None, **kwargs):
def onPlayerStatus(
self, playback_pos=None, pipeline_state=None, pipeline_disabled=None, **kwargs):
if playback_pos is not None:
sample_pos, num_samples = playback_pos
@ -458,11 +459,42 @@ class SheetViewImpl(QtWidgets.QGraphicsView):
start_measure_idx, start_measure_tick,
end_measure_idx, end_measure_tick)
if pipeline_state is not None:
self.window.pipeline_status.setText(pipeline_state)
logger.info("pipeline state: %s", pipeline_state)
if pipeline_disabled:
dialog = QtWidgets.QMessageBox(self)
dialog.setIcon(QtWidgets.QMessageBox.Critical)
dialog.setWindowTitle("noisicaa - Crash")
dialog.setText(
"The audio pipeline has been disabled, because it is repeatedly crashing.")
quit_button = dialog.addButton("Quit", QtWidgets.QMessageBox.DestructiveRole)
undo_and_restart_button = dialog.addButton(
"Undo last command and restart pipeline", QtWidgets.QMessageBox.ActionRole)
restart_button = dialog.addButton("Restart pipeline", QtWidgets.QMessageBox.AcceptRole)
dialog.setDefaultButton(restart_button)
dialog.finished.connect(lambda _: self.call_async(
self.onPipelineDisabledDialogFinished(
dialog, quit_button, undo_and_restart_button, restart_button)))
dialog.show()
async def onPipelineDisabledDialogFinished(
self, dialog, quit_button, undo_and_restart_button, restart_button):
if dialog.clickedButton() == quit_button:
self.app.quit()
elif dialog.clickedButton() == restart_button:
await self.project_client.restart_player_pipeline(self._player_id)
elif dialog.clickedButton() == undo_and_restart_button:
await self.project_client.undo()
await self.project_client.restart_player_pipeline(self._player_id)
def onRender(self):
dialog = RenderSheetDialog(self, self.app, self._sheet)
dialog.exec_()
def onCopy(self):
if not self._selection_set:
return

@ -48,7 +48,10 @@ class CommonMixin(object):
def __call_async_cb(self, task, callback):
if task.exception() is not None:
self.__app.crashWithMessage("Exception in callback", str(task.exception()))
self.__app.crashWithMessage(
"Exception in callback",
("Callback: %s\n" % task
+ "Exception: %s" % task.exception()))
raise task.exception()
if callback is not None:

Loading…
Cancel
Save