Unify the different messages, which are generated by the engine, into a single proto message.

looper
Ben Niemann 4 years ago
parent bc8cc6bf71
commit 22c4e292d0

@ -32,6 +32,9 @@ from .mutations import (
SetPluginState,
)
from .public import (
NodeStateChange,
EngineStateChange,
EngineNotification,
MusicalDuration,
MusicalTime,
PluginState,

@ -25,8 +25,11 @@ import logging
from typing import Any, Dict, Optional, Set, Tuple
from noisicaa import core
# pylint/mypy don't understand capnp modules.
from noisicaa.core import perf_stats_capnp # type: ignore # pylint: disable=no-name-in-module
from noisicaa.core import ipc
from noisicaa import node_db
from .public import engine_notification_pb2
from .public import player_state_pb2
from .public import processor_message_pb2
from . import mutations
@ -39,8 +42,11 @@ class AudioProcClientBase(object):
self.event_loop = event_loop
self.server = server
self.pipeline_status = None # type: core.Callback[Dict[str, Any]]
self.player_state_changed = None # type: core.Callback[player_state_pb2.PlayerState]
self.engine_notifications = None # type: core.Callback[engine_notification_pb2.EngineNotification]
self.engine_state_changed = None # type: core.Callback[engine_notification_pb2.EngineStateChange]
self.player_state_changed = None # type: core.CallbackMap[str, player_state_pb2.PlayerState]
self.node_state_changed = None # type: core.CallbackMap[str, engine_notification_pb2.NodeStateChange]
self.perf_stats = None # type: core.Callback[object]
@property
def address(self) -> str:
@ -136,8 +142,11 @@ class AudioProcClientMixin(AudioProcClientBase):
self._stub = None # type: ipc.Stub
self._session_id = None # type: str
self.pipeline_status = core.Callback[Dict[str, Any]]()
self.player_state_changed = core.Callback[player_state_pb2.PlayerState]()
self.engine_notifications = core.Callback[engine_notification_pb2.EngineNotification]()
self.engine_state_changed = core.Callback[engine_notification_pb2.EngineStateChange]()
self.player_state_changed = core.CallbackMap[str, player_state_pb2.PlayerState]()
self.node_state_changed = core.CallbackMap[str, engine_notification_pb2.NodeStateChange]()
self.perf_stats = core.Callback[object]()
@property
def address(self) -> str:
@ -146,17 +155,11 @@ class AudioProcClientMixin(AudioProcClientBase):
async def setup(self) -> None:
await super().setup()
self.server.add_command_handler(
'PIPELINE_MUTATION', self.handle_pipeline_mutation)
self.server.add_command_handler(
'PLAYER_STATE', self.handle_player_state, log_level=-1)
self.server.add_command_handler(
'PIPELINE_STATUS', self.handle_pipeline_status, log_level=-1)
'ENGINE_NOTIFICATION', self.__handle_engine_notification, log_level=logging.DEBUG)
async def cleanup(self) -> None:
await self.disconnect()
self.server.remove_command_handler('PIPELINE_MUTATION')
self.server.remove_command_handler('PLAYER_STATE')
self.server.remove_command_handler('PIPELINE_STATUS')
self.server.remove_command_handler('ENGINE_NOTIFICATION')
await super().cleanup()
async def connect(self, address: str, flags: Optional[Set[str]] = None) -> None:
@ -181,6 +184,23 @@ class AudioProcClientMixin(AudioProcClientBase):
await self._stub.close()
self._stub = None
async def __handle_engine_notification(
self, msg: engine_notification_pb2.EngineNotification) -> None:
self.engine_notifications.call(msg)
if msg.HasField('player_state'):
player_state = msg.player_state
self.player_state_changed.call(player_state.realm, player_state)
for node_state_change in msg.node_state_changes:
self.node_state_changed.call(node_state_change.node_id, node_state_change)
for engine_state_change in msg.engine_state_changes:
self.engine_state_changed.call(engine_state_change)
if msg.HasField('perf_stats'):
self.perf_stats.call(perf_stats_capnp.PerfStats.from_bytes_packed(msg.perf_stats))
async def shutdown(self) -> None:
await self._stub.call('SHUTDOWN')
@ -255,12 +275,3 @@ class AudioProcClientMixin(AudioProcClientBase):
async def update_project_properties(self, realm: str, **kwargs: Any) -> None:
return await self._stub.call('UPDATE_PROJECT_PROPERTIES', self._session_id, realm, kwargs)
def handle_pipeline_mutation(self, mutation: mutations.Mutation) -> None:
logger.info("Mutation received: %s", mutation)
def handle_pipeline_status(self, status: Dict[str, Any]) -> None:
self.pipeline_status.call(status)
def handle_player_state(self, state: player_state_pb2.PlayerState) -> None:
self.player_state_changed.call(state)

@ -160,13 +160,14 @@ class AudioProcClientTest(
async with self.create_process(inline_plugin_host=False) as client:
is_broken = asyncio.Event(loop=self.loop)
def pipeline_status(status):
logger.info("pipeline_status(%s)", status)
if 'node_state' in status:
realm, node_id, node_state = status['node_state']
if realm == 'root' and node_id == 'test' and node_state == 'BROKEN':
def engine_notification(msg):
logger.info("engine_notification:\n%s", msg)
for node_state_change in msg.node_state_changes:
if (node_state_change.realm == 'root'
and node_state_change.node_id == 'test'
and node_state_change.state == 'BROKEN'):
is_broken.set()
client.pipeline_status.add(pipeline_status)
client.engine_notifications.add(engine_notification)
await client.set_backend('null')

@ -20,9 +20,11 @@
#
# @end:license
import asyncio
import functools
import logging
import sys
import time
import uuid
from typing import cast, Any, Optional, Dict, List, Set, Tuple
@ -32,6 +34,7 @@ from noisicaa import core
from noisicaa import node_db
from noisicaa import lv2
from noisicaa import host_system
from .public import engine_notification_pb2
from .public import player_state_pb2
from .public import processor_message_pb2
from . import engine
@ -41,64 +44,67 @@ logger = logging.getLogger(__name__)
class Session(core.CallbackSessionMixin, core.SessionBase):
async_connect = False
def __init__(self, client_address: str, flags: Set, **kwargs: Any) -> None:
super().__init__(callback_address=client_address, **kwargs)
self.__flags = flags or set()
self.__pending_mutations = [] # type: List[mutations.Mutation]
self.owned_realms = set() # type: Set[str]
# async def setup(self):
# await super().setup()
# Send initial mutations to build up the current pipeline
# state.
# TODO: reanimate
# for node in self.__engine.nodes:
# mutation = mutations.AddNode(node)
# session.publish_mutation(mutation)
# for node in self.__engine.nodes:
# for port in node.inputs.values():
# for upstream_port in port.inputs:
# mutation = mutations.ConnectPorts(
# upstream_port, port)
# session.publish_mutation(mutation)
self.__shutdown = False
self.__notification_pusher_task = None # type: asyncio.Task
self.__notification_available = None # type: asyncio.Event
self.__pending_notification = engine_notification_pb2.EngineNotification()
def callback_connected(self) -> None:
while self.__pending_mutations:
self.publish_mutation(self.__pending_mutations.pop(0))
async def setup(self):
await super().setup()
def publish_mutation(self, mutation: mutations.Mutation) -> None:
if not self.callback_alive:
self.__pending_mutations.append(mutation)
return
self.__shutdown = False
self.__notification_available = asyncio.Event(loop=self.event_loop)
self.__notification_pusher_task = self.event_loop.create_task(
self.__notification_pusher())
self.async_callback('PIPELINE_MUTATION', mutation)
async def cleanup(self):
if self.__notification_pusher_task is not None:
self.__shutdown = True
self.__notification_available.set()
await self.__notification_pusher_task
self.__notification_pusher_task.result()
self.__notification_pusher_task = None
def publish_player_state(self, state: player_state_pb2.PlayerState) -> None:
if state.realm not in self.owned_realms:
return
await super().cleanup()
if not self.callback_alive:
return
async def __notification_pusher(self):
while True:
await self.__notification_available.wait()
self.__notification_available.clear()
next_notification = time.time() + 1.0/50
self.async_callback('PLAYER_STATE', state)
if self.__shutdown:
return
def publish_status(self, status: Dict[str, Any]) -> None:
if not self.callback_alive:
return
if not self.callback_alive:
continue
status = dict(status)
notification = self.__pending_notification
self.__pending_notification = engine_notification_pb2.EngineNotification()
await self.callback('ENGINE_NOTIFICATION', notification)
if 'perf_data' not in self.__flags and 'perf_data' in status:
del status['perf_data']
delay = next_notification - time.time()
if delay > 0:
await asyncio.sleep(delay, loop=self.event_loop)
if status:
self.async_callback('PIPELINE_STATUS', status)
def callback_connected(self) -> None:
pass
def publish_engine_notification(self, msg: engine_notification_pb2.EngineNotification) -> None:
# TODO: filter out message for not owned realms
self.__pending_notification.MergeFrom(msg)
self.__notification_available.set()
class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
session_cls = Session
def __init__(
@ -161,9 +167,9 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
host_system=self.__host_system,
shm=self.shm,
profile_path=self.profile_path)
self.__engine.perf_data.add(self.perf_data_callback)
self.__engine.node_state_changed.add(self.node_state_callback)
self.__engine.player_state_changed.add(self.player_state_callback)
self.__engine.notifications.add(
lambda msg: self.event_loop.call_soon_threadsafe(
functools.partial(self.__handle_engine_notification, msg)))
await self.__engine.setup()
@ -191,17 +197,9 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
await super().cleanup()
def publish_mutation(self, mutation: mutations.Mutation) -> None:
def __handle_engine_notification(self, msg: engine_notification_pb2.EngineNotification) -> None:
for session in self.sessions:
cast(Session, session).publish_mutation(mutation)
def publish_status(self, **kwargs: Any) -> None:
for session in self.sessions:
cast(Session, session).publish_status(kwargs)
def publish_player_state(self, state: player_state_pb2.PlayerState) -> None:
for session in self.sessions:
cast(Session, session).publish_player_state(state)
cast(Session, session).publish_engine_notification(msg)
async def __handle_create_realm(
self, session_id: str, name: str, parent: str, enable_player: bool,
@ -316,21 +314,6 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
realm = self.__engine.get_realm(realm_name)
realm.update_project_properties(**properties)
def perf_data_callback(self, perf_data: core.PerfStats) -> None:
self.event_loop.call_soon_threadsafe(
functools.partial(
self.publish_status, perf_data=perf_data))
def node_state_callback(self, realm: str, node_id: str, state: engine.ProcessorState) -> None:
logger.info('%s %s', node_id, state)
self.event_loop.call_soon_threadsafe(
functools.partial(
self.publish_status, node_state=(realm, node_id, state)))
def player_state_callback(self, state: player_state_pb2.PlayerState) -> None:
self.event_loop.call_soon_threadsafe(
functools.partial(self.publish_player_state, state))
async def handle_play_file(self, session_id: str, path: str) -> str:
self.get_session(session_id)

@ -39,6 +39,7 @@ from noisicaa.bindings import lv2
from noisicaa.core import ipc
from noisicaa.core.status cimport check
from noisicaa.core.perf_stats cimport PyPerfStats
from noisicaa.audioproc.public import engine_notification_pb2
from noisicaa.audioproc.public import musical_time
from noisicaa.audioproc.public import player_state_pb2
from . cimport realm as realm_lib
@ -71,9 +72,7 @@ class Engine(object):
self, *,
host_system, event_loop, manager, server_address,
shm=None, profile_path=None):
self.player_state_changed = core.Callback()
self.node_state_changed = core.Callback()
self.perf_data = core.Callback()
self.notifications = core.Callback()
self.__host_system = host_system
self.__event_loop = event_loop
@ -106,7 +105,14 @@ class Engine(object):
# TODO: reimplement
pass
def __set_state(self, state: engine_notification_pb2.EngineStateChange.State) -> None:
self.notifications.call(engine_notification_pb2.EngineNotification(
engine_state_changes=[engine_notification_pb2.EngineStateChange(
state=state)]))
async def setup(self, *, start_thread=True):
self.__set_state(engine_notification_pb2.EngineStateChange.SETUP)
self.__engine_started = threading.Event()
self.__engine_exit = threading.Event()
if start_thread:
@ -119,8 +125,11 @@ class Engine(object):
self.__maintenance_task = self.__event_loop.create_task(self.__maintenance_task_main())
logger.info("Engine up and running.")
self.__set_state(engine_notification_pb2.EngineStateChange.RUNNING)
async def cleanup(self):
self.__set_state(engine_notification_pb2.EngineStateChange.CLEANUP)
if self.__backend is not None:
logger.info("Stopping backend...")
self.__backend.stop()
@ -163,6 +172,8 @@ class Engine(object):
listener.remove()
self.__realm_listeners.clear()
self.__set_state(engine_notification_pb2.EngineStateChange.STOPPED)
def add_notification_listener(self, node_id, func):
return self.__notification_listeners.add(node_id, func)
@ -205,7 +216,10 @@ class Engine(object):
if enable_player:
logger.info("Enabling player...")
player = PyPlayer(self.__host_system, name)
player.player_state_changed.add(self.player_state_changed.call)
player.player_state_changed.add(
lambda state: self.notifications.call(
engine_notification_pb2.EngineNotification(
player_state=state)))
else:
logger.info("Player disabled.")
@ -220,7 +234,12 @@ class Engine(object):
callback_address=callback_address)
self.__realms[name] = realm
self.__realm_listeners['%s:node_state_changed' % name] = realm.node_state_changed.add(
functools.partial(self.node_state_changed.call, name))
lambda node_id, state: self.notifications.call(engine_notification_pb2.EngineNotification(
node_state_changes=[engine_notification_pb2.NodeStateChange(
realm=name,
node_id=node_id,
state=state)])))
if parent is None:
self.__root_realm = realm
else:
@ -251,6 +270,7 @@ class Engine(object):
if reinit:
logger.info("Reinitializing engine...")
self.__set_state(engine_notification_pb2.EngineStateChange.CLEANUP)
if self.__backend is not None:
logger.info("Stopping backend...")
@ -293,6 +313,7 @@ class Engine(object):
self.__host_system.cleanup()
logger.info("Engine stopped, changing host parameters...")
self.__set_state(engine_notification_pb2.EngineStateChange.STOPPED)
if block_size is not None:
self.__host_system.set_block_size(block_size)
@ -300,6 +321,7 @@ class Engine(object):
self.__host_system.set_sample_rate(sample_rate)
logger.info("Restarting engine...")
self.__set_state(engine_notification_pb2.EngineStateChange.SETUP)
logger.info("Restarting host system...")
self.__host_system.setup()
@ -326,6 +348,7 @@ class Engine(object):
self.__maintenance_task = self.__event_loop.create_task(self.__maintenance_task_main())
logger.info("Engine reinitialized.")
self.__set_state(engine_notification_pb2.EngineStateChange.RUNNING)
def set_backend(self, name, **parameters):
assert self.__root_realm is not None
@ -384,72 +407,88 @@ class Engine(object):
cdef message_queue.Message* msg
cdef realm_lib.PyProgram program
cdef PyBlockContext ctxt
cdef double last_loop_time = -1.0
while True:
if self.__engine_exit.is_set():
logger.info("Exiting engine mainloop.")
break
if backend is None:
if self.__backend_ready.wait(0.1):
self.__backend_ready.clear()
assert self.__backend is not None
backend = self.__backend
else:
notification = engine_notification_pb2.EngineNotification()
try:
if self.__engine_exit.is_set():
logger.info("Exiting engine mainloop.")
break
if backend is None:
if self.__backend_ready.wait(0.1):
self.__backend_ready.clear()
assert self.__backend is not None
backend = self.__backend
else:
continue
elif backend.released():
backend = None
self.__backend_released.set()
continue
elif backend.released():
backend = None
self.__backend_released.set()
continue
if backend.stopped():
logger.info("Backend stopped, exiting engine mainloop.")
break
if backend.stopped():
logger.info("Backend stopped, exiting engine mainloop.")
break
ctxt = self.__root_realm.block_context
ctxt = self.__root_realm.block_context
if len(ctxt.perf) > 0:
notification.perf_stats = ctxt.perf.serialize().to_bytes_packed()
ctxt.perf.reset()
if len(ctxt.perf) > 0:
self.perf_data.call(ctxt.perf.serialize())
ctxt.perf.reset()
program = self.__root_realm.get_active_program()
if program is None:
time.sleep(0.1)
continue
program = self.__root_realm.get_active_program()
if program is None:
time.sleep(0.1)
continue
backend.begin_block(ctxt)
try:
self.__root_realm.process_block(program)
backend.begin_block(ctxt)
try:
self.__root_realm.process_block(program)
for channel in ('left', 'right'):
sink_buf = self.__root_realm.get_buffer(
'sink:in:' + channel, buffers.PyFloatAudioBlockBuffer())
backend.output(ctxt, channel, sink_buf)
for channel in ('left', 'right'):
sink_buf = self.__root_realm.get_buffer(
'sink:in:' + channel, buffers.PyFloatAudioBlockBuffer())
backend.output(ctxt, channel, sink_buf)
if last_loop_time >= 0:
loop_time = time.perf_counter() - last_loop_time
block_time = float(self.__host_system.block_size) / self.__host_system.sample_rate
load = loop_time / block_time
notification.engine_state_changes.add(
state=engine_notification_pb2.EngineStateChange.RUNNING,
load=load)
finally:
backend.end_block(ctxt)
finally:
backend.end_block(ctxt)
out_messages = ctxt.get().out_messages.get()
msg = out_messages.first()
while not out_messages.is_end(msg):
if msg.type == message_queue.MessageType.SOUND_FILE_COMPLETE:
node_id = bytes((<message_queue.SoundFileCompleteMessage*>msg).node_id).decode('utf-8')
self.__notification_listeners.call(node_id, msg.type)
last_loop_time = time.perf_counter()
elif msg.type == message_queue.MessageType.PORT_RMS:
node_id = bytes(
(<message_queue.PortRMSMessage*>msg).node_id).decode('utf-8')
port_index = (<message_queue.PortRMSMessage*>msg).port_index
rms = (<message_queue.PortRMSMessage*>msg).rms
out_messages = ctxt.get().out_messages.get()
msg = out_messages.first()
while not out_messages.is_end(msg):
if msg.type == message_queue.MessageType.SOUND_FILE_COMPLETE:
node_id = bytes((<message_queue.SoundFileCompleteMessage*>msg).node_id).decode('utf-8')
self.__notification_listeners.call(node_id, msg.type)
logger.debug("%s:%d = %f", node_id, port_index, rms)
elif msg.type == message_queue.MessageType.PORT_RMS:
node_id = bytes(
(<message_queue.PortRMSMessage*>msg).node_id).decode('utf-8')
port_index = (<message_queue.PortRMSMessage*>msg).port_index
rms = (<message_queue.PortRMSMessage*>msg).rms
else:
logger.debug("out message %d", msg.type)
logger.debug("%s:%d = %f", node_id, port_index, rms)
msg = out_messages.next(msg)
out_messages.clear()
else:
logger.debug("out message %d", msg.type)
msg = out_messages.next(msg)
out_messages.clear()
finally:
self.notifications.call(notification)
async def __maintenance_task_main(self):
while True:

@ -51,12 +51,13 @@ class ProcessorMessage;
class ProcessorParameters;
}
// Keep this in sync with engine_notification.proto > NodeStateChange
enum ProcessorState {
INACTIVE,
SETUP,
RUNNING,
BROKEN,
CLEANUP,
INACTIVE = 1,
SETUP = 2,
RUNNING = 3,
BROKEN = 4,
CLEANUP = 5,
};
class Processor : public RefCounted {

@ -265,8 +265,7 @@ cdef class PyRealm(object):
try:
node_id = bytes(c_node_id).decode('utf-8')
state = processor.State(c_state)
self.node_state_changed.call(node_id, state.name)
self.node_state_changed.call(node_id, c_state)
finally:
PyErr_Restore(exc_type, exc_value, exc_trackback)

@ -21,6 +21,7 @@
add_python_package()
set(LIB_SRCS
engine_notification.pb.cc
musical_time.cpp
musical_time.pb.cc
player_state.pb.cc
@ -42,6 +43,8 @@ cpp_proto(plugin_state.proto)
py_proto(plugin_state.proto)
cpp_proto(processor_message.proto)
py_proto(processor_message.proto)
cpp_proto(engine_notification.proto)
py_proto(engine_notification.proto)
add_library(noisicaa-audioproc-public SHARED ${LIB_SRCS})
target_compile_options(noisicaa-audioproc-public PRIVATE -fPIC -std=c++11 -Wall -Werror -pedantic -DHAVE_PTHREAD_SPIN_LOCK)

@ -18,6 +18,11 @@
#
# @end:license
from .engine_notification_pb2 import (
NodeStateChange,
EngineStateChange,
EngineNotification,
)
from .musical_time import (
PyMusicalDuration as MusicalDuration,
PyMusicalTime as MusicalTime,

@ -0,0 +1,59 @@
/*
* @begin:license
*
* Copyright (c) 2015-2018, Benjamin Niemann <pink@odahoda.de>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* @end:license
*/
syntax = "proto2";
import "noisicaa/audioproc/public/player_state.proto";
package noisicaa.pb;
message NodeStateChange {
required string realm = 1;
required string node_id = 2;
enum State {
INACTIVE = 1;
SETUP = 2;
RUNNING = 3;
BROKEN = 4;
CLEANUP = 5;
}
required State state = 3;
}
message EngineStateChange {
enum State {
STOPPED = 1;
SETUP = 2;
RUNNING = 3;
CLEANUP = 4;
}
required State state = 1;
optional float load = 2;
}
message EngineNotification {
optional PlayerState player_state = 1;
repeated NodeStateChange node_state_changes = 2;
repeated EngineStateChange engine_state_changes = 3;
optional bytes perf_stats = 4;
}

@ -52,9 +52,6 @@ class Player(object):
self.audioproc_client = audioproc_client
self.realm = realm
self.player_state_changed = core.Callback[audioproc.PlayerState]()
self.pipeline_status = core.Callback[Dict[str, Any]]()
self.__listeners = {} # type: Dict[str, core.Listener]
self.id = uuid.uuid4().hex
@ -73,10 +70,6 @@ class Player(object):
self.__listeners['pipeline_mutations'] = self.project.pipeline_mutation.add(
self.handle_pipeline_mutation)
self.__listeners['player_state'] = self.audioproc_client.player_state_changed.add(
self.__handle_player_state)
self.__listeners['pipeline_status'] = self.audioproc_client.pipeline_status.add(
self.pipeline_status.call)
logger.info("Populating realm with project state...")
for mutation in self.project.get_add_mutations():
@ -145,25 +138,6 @@ class Player(object):
if exc is not None:
logger.error("UPDATE_PROJECT_PROPERTIES failed with exception: %s", exc)
def __handle_player_state(self, state: audioproc.PlayerState) -> None:
self.player_state_changed.call(state)
self.publish_status_async(player_state=state)
def publish_status_async(self, **kwargs: Any) -> None:
if self.callback_stub is None:
return
callback_task = asyncio.run_coroutine_threadsafe(
self.callback_stub.call('PLAYER_STATUS', self.id, kwargs),
self.event_loop)
callback_task.add_done_callback(self.publish_status_done)
def publish_status_done(self, callback_task: concurrent.futures.Future) -> None:
assert callback_task.done()
exc = callback_task.exception()
if exc is not None:
logger.error("PLAYER_STATUS failed with exception: %s", exc)
def __on_project_nodes_changed(self, change: model.PropertyChange) -> None:
if isinstance(change, model.PropertyListInsert):
messages = audioproc.ProcessorMessageList()

@ -370,7 +370,6 @@ class ProjectClient(object):
self._session_data = None # type: Dict[str, Any]
self.__pool = None # type: Pool
self.project = None # type: Project
self.__player_status_listeners = core.CallbackMap[str, Any]()
self.__session_data_listeners = core.CallbackMap[str, Any]()
def __set_project(self, root_id: int) -> None:
@ -384,9 +383,6 @@ class ProjectClient(object):
'PROJECT_MUTATIONS', self.handle_project_mutations)
self.server.add_command_handler(
'PROJECT_CLOSED', self.handle_project_closed)
self.server.add_command_handler(
'PLAYER_STATUS', self.handle_player_status,
log_level=-1)
self.server.add_command_handler(
'SESSION_DATA_MUTATION', self.handle_session_data_mutation)
@ -498,13 +494,6 @@ class ProjectClient(object):
async def restart_player_pipeline(self, player_id: str) -> None:
await self._stub.call('RESTART_PLAYER_PIPELINE', self._session_id, player_id)
def add_player_status_listener(
self, player_id: str, func: Callable[..., None]) -> core.Listener:
return self.__player_status_listeners.add(player_id, func)
async def handle_player_status(self, player_id: str, args: Dict[str, Any]) -> None:
self.__player_status_listeners.call(player_id, **args)
async def dump(self) -> None:
await self._stub.call('DUMP', self._session_id)

@ -56,40 +56,25 @@ class Session(core.CallbackSessionMixin, core.SessionBase):
self.session_data = {} # type: Dict[str, Any]
self.session_data_path = None # type: str
self._players = {} # type: Dict[str, Tuple[core.Listener, player_lib.Player]]
self.__players = {} # type: Dict[str, Tuple[core.Listener, player_lib.Player]]
async def cleanup(self) -> None:
for listener, p in self._players.values():
listener.remove()
await p.cleanup()
self._players.clear()
await self.clear_players()
await super().cleanup()
def get_player(self, player_id: str) -> player_lib.Player:
return self._players[player_id][1]
return self.__players[player_id]
def add_player(self, player: player_lib.Player) -> None:
listener = player.pipeline_status.add(self.handle_pipeline_status)
self._players[player.id] = (listener, player)
self.__players[player.id] = player
def remove_player(self, player: player_lib.Player) -> None:
listener = self._players[player.id][0]
listener.remove()
del self._players[player.id]
del self.__players[player.id]
async def clear_players(self) -> None:
for listener, player in self._players.values():
for player in self.__players.values():
await player.cleanup()
listener.remove()
self._players.clear()
def handle_pipeline_status(self, status: Dict[str, Any]) -> None:
if 'node_state' in status:
logger.error(status['node_state'])
_, node_id, state = status['node_state']
if 'broken' in state:
self.set_value('pipeline_graph_node/%s/broken' % node_id, state['broken'])
self.__players.clear()
async def publish_mutations(self, mutations: mutations_pb2.MutationList) -> None:
assert self.callback_alive

@ -471,24 +471,26 @@ class Renderer(object):
self.__datastream_transport = transport
self.__datastream_protocol = cast(DataStreamProtocol, protocol)
def __handle_player_state(self, state: audioproc.PlayerState) -> None:
assert state.HasField('playing')
assert state.HasField('current_time')
def __handle_engine_notification(self, msg: audioproc.EngineNotification) -> None:
if msg.HasField('player_state'):
state = msg.player_state
assert state.HasField('playing')
assert state.HasField('current_time')
if self.__playing:
self.__current_time = audioproc.MusicalTime.from_proto(state.current_time)
if self.__playing:
self.__current_time = audioproc.MusicalTime.from_proto(state.current_time)
if not self.__playing and state.playing:
assert not self.__player_started.is_set()
self.__player_started.set()
if not self.__playing and state.playing:
assert not self.__player_started.is_set()
self.__player_started.set()
if self.__playing and not state.playing:
assert not self.__player_finished.is_set()
self.__player_finished.set()
if self.__playing and not state.playing:
assert not self.__player_finished.is_set()
self.__player_finished.set()
self.__playing = state.playing
self.__playing = state.playing
self.__player_state_changed.set()
self.__player_state_changed.set()
async def __progress_pump_main(self) -> None:
self.__next_progress_update = (fractions.Fraction(0), time.time())
@ -532,6 +534,8 @@ class Renderer(object):
sample_rate=self.__render_settings.sample_rate)
self.__audioproc_client = AudioProcClient(self.__event_loop, self.__tmp_dir)
self.__audioproc_client.engine_notifications.add(self.__handle_engine_notification)
await self.__audioproc_client.setup()
await self.__audioproc_client.connect(self.__audioproc_address)
@ -544,7 +548,6 @@ class Renderer(object):
event_loop=self.__event_loop,
audioproc_client=self.__audioproc_client,
realm='root')
self.__player.player_state_changed.add(self.__handle_player_state)
await self.__player.setup()
async def run(self) -> None:

@ -36,68 +36,69 @@ logger = logging.getLogger(__name__)
class LadspaScanner(scanner.Scanner):
def scan(self) -> Iterator[Tuple[str, node_db.NodeDescription]]:
# TODO: support configurable searchpaths
rootdir = os.environ.get('LADSPA_PATH', '/usr/lib/ladspa')
for dirpath, _, filenames in os.walk(rootdir):
for filename in filenames:
if not filename.endswith('.so'):
continue
path = os.path.join(dirpath, filename)
logger.info("Loading LADSPA plugins from %s", path)
try:
lib = ladspa.Library(path)
except ladspa.Error as exc:
logger.warning("Failed to load LADSPA library %s: %s", path, exc)
continue
for descriptor in lib.descriptors: # pylint: disable=not-an-iterable
uri = 'ladspa://%s/%s' % (filename, descriptor.label)
logger.info("Adding LADSPA plugin %s", uri)
desc = node_db.NodeDescription()
desc.supported = True
desc.display_name = descriptor.name
desc.type = node_db.NodeDescription.PLUGIN
desc.processor.type = node_db.ProcessorDescription.PLUGIN
desc.plugin.type = node_db.PluginDescription.LADSPA
desc.has_ui = False
ladspa_desc = desc.ladspa
ladspa_desc.library_path = path
ladspa_desc.label = descriptor.label
for port in descriptor.ports:
port_desc = desc.ports.add()
port_desc.name = port.name
if port.direction == ladspa.PortDirection.Input:
port_desc.direction = node_db.PortDescription.INPUT
elif port.direction == ladspa.PortDirection.Output:
port_desc.direction = node_db.PortDescription.OUTPUT
else:
raise ValueError(port)
if port.type == ladspa.PortType.Control:
port_desc.type = node_db.PortDescription.KRATE_CONTROL
elif port.type == ladspa.PortType.Audio:
port_desc.type = node_db.PortDescription.AUDIO
else:
raise ValueError(port)
if (port.type == ladspa.PortType.Control
and port.direction == ladspa.PortDirection.Input):
lower_bound = port.lower_bound(44100)
upper_bound = port.upper_bound(44100)
default = port.default(44100)
value_desc = port_desc.float_value
# Using a fixed sample rate is pretty ugly...
if lower_bound is not None:
value_desc.min = lower_bound
if upper_bound is not None:
value_desc.max = upper_bound
if default is not None:
value_desc.default = default
yield uri, desc
rootdirs = os.environ.get('LADSPA_PATH', '/usr/lib/ladspa')
for rootdir in rootdirs.split(':'):
for dirpath, _, filenames in os.walk(rootdir):
for filename in filenames:
if not filename.endswith('.so'):
continue
path = os.path.join(dirpath, filename)
logger.info("Loading LADSPA plugins from %s", path)
try:
lib = ladspa.Library(path)
except ladspa.Error as exc:
logger.warning("Failed to load LADSPA library %s: %s", path, exc)
continue
for descriptor in lib.descriptors: # pylint: disable=not-an-iterable
uri = 'ladspa://%s/%s' % (filename, descriptor.label)
logger.info("Adding LADSPA plugin %s", uri)
desc = node_db.NodeDescription()
desc.supported = True
desc.display_name = descriptor.name
desc.type = node_db.NodeDescription.PLUGIN
desc.processor.type = node_db.ProcessorDescription.PLUGIN
desc.plugin.type = node_db.PluginDescription.LADSPA
desc.has_ui = False
ladspa_desc = desc.ladspa
ladspa_desc.library_path = path
ladspa_desc.label = descriptor.label
for port in descriptor.ports:
port_desc = desc.ports.add()
port_desc.name = port.name
if port.direction == ladspa.PortDirection.Input:
port_desc.direction = node_db.PortDescription.INPUT
elif port.direction == ladspa.PortDirection.Output:
port_desc.direction = node_db.PortDescription.OUTPUT
else:
raise ValueError(port)
if port.type == ladspa.PortType.Control:
port_desc.type = node_db.PortDescription.KRATE_CONTROL
elif port.type == ladspa.PortType.Audio:
port_desc.type = node_db.PortDescription.AUDIO
else:
raise ValueError(port)
if (port.type == ladspa.PortType.Control
and port.direction == ladspa.PortDirection.Input):
lower_bound = port.lower_bound(44100)
upper_bound = port.upper_bound(44100)
default = port.default(44100)
value_desc = port_desc.float_value
# Using a fixed sample rate is pretty ugly...
if lower_bound is not None:
value_desc.min = lower_bound
if upper_bound is not None:
value_desc.max = upper_bound
if default is not None:
value_desc.default = default
yield uri, desc

@ -77,12 +77,7 @@ class AudioProcClientImpl(audioproc.AudioProcClientBase): # pylint: disable=abs
pass
class AudioProcClient(audioproc.AudioProcClientMixin, AudioProcClientImpl):
def __init__(self, app: 'EditorApp', *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.__app = app
def handle_pipeline_status(self, status: Dict[str, Any]) -> None:
self.__app.onPipelineStatus(status)
pass
class QApplication(QtWidgets.QApplication):
@ -132,6 +127,8 @@ class EditorApp(ui_base.AbstractEditorApp):
self.stat_monitor = None # type: stat_monitor.StatMonitor
self.default_style = None # type: str
self.__player_state_listeners = core.CallbackMap[str, audioproc.EngineNotification]()
@property
def context(self) -> ui_base.CommonContext:
return self.__context
@ -260,7 +257,8 @@ class EditorApp(ui_base.AbstractEditorApp):
)
self.audioproc_client = AudioProcClient(
self, self.process.event_loop, self.process.server)
self.process.event_loop, self.process.server)
self.audioproc_client.engine_notifications.add(self.__handleEngineNotification)
await self.audioproc_client.setup()
await self.audioproc_client.connect(
self.audioproc_process, {'perf_data'})
@ -309,11 +307,39 @@ class EditorApp(ui_base.AbstractEditorApp):
return (self.runtime_settings.dev_mode
and self.show_edit_areas_action.isChecked())
def onPipelineStatus(self, status: Dict[str, Any]) -> None:
if 'perf_data' in status:
if self.pipeline_perf_monitor is not None:
self.pipeline_perf_monitor.addPerfData(
status['perf_data'])
def __handleEngineNotification(self, msg: audioproc.EngineNotification) -> None:
pass
# def onPlayerStatus(self, player_state: audioproc.PlayerState):
# if pipeline_disabled:
# dialog = QtWidgets.QMessageBox(self)
# dialog.setIcon(QtWidgets.QMessageBox.Critical)
# dialog.setWindowTitle("noisicaa - Crash")
# dialog.setText(
# "The audio pipeline has been disabled, because it is repeatedly crashing.")
# quit_button = dialog.addButton("Quit", QtWidgets.QMessageBox.DestructiveRole)
# undo_and_restart_button = dialog.addButton(
# "Undo last command and restart pipeline", QtWidgets.QMessageBox.ActionRole)
# restart_button = dialog.addButton("Restart pipeline", QtWidgets.QMessageBox.AcceptRole)
# dialog.setDefaultButton(restart_button)
# dialog.finished.connect(lambda _: self.call_async(
# self.onPipelineDisabledDialogFinished(
# dialog, quit_button, undo_and_restart_button, restart_button)))
# dialog.show()
# async def onPipelineDisabledDialogFinished(
# self, dialog: QtWidgets.QMessageBox, quit_button: QtWidgets.QAbstractButton,
# undo_and_restart_button: QtWidgets.QAbstractButton,
# restart_button: QtWidgets.QAbstractButton) -> None:
# if dialog.clickedButton() == quit_button:
# self.app.quit()
# elif dialog.clickedButton() == restart_button:
# await self.project_client.restart_player_pipeline(self.__player_id)
# elif dialog.clickedButton() == undo_and_restart_button:
# await self.project_client.undo()
# await self.project_client.restart_player_pipeline(self.__player_id)
def setClipboardContent(self, content: Any) -> None:
logger.info(

@ -30,6 +30,7 @@ from PyQt5 import QtGui
from PyQt5 import QtWidgets
from noisicaa import constants
from noisicaa import audioproc
from noisicaa import music
from ..exceptions import RestartAppException, RestartAppCleanException
from .settings import SettingsDialog
@ -38,6 +39,7 @@ from . import ui_base
from . import instrument_library
from . import qprogressindicator
from . import project_registry
from . import load_history
logger = logging.getLogger(__name__)
@ -54,6 +56,8 @@ class EditorWindow(ui_base.AbstractEditorWindow):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.__engine_state_listener = None # type: core.Listener[audioproc.EngineStateChange]
self._settings_dialog = SettingsDialog(parent=self, context=self.context)
self._instrument_library_dialog = instrument_library.InstrumentLibraryDialog(
@ -95,11 +99,18 @@ class EditorWindow(ui_base.AbstractEditorWindow):
self.app.settings.value('mainwindow/state', b''))
async def setup(self) -> None:
self.__engine_state_listener = self.audioproc_client.engine_state_changed.add(
self.__engineStateChanged)
await self._instrument_library_dialog.setup()
async def cleanup(self) -> None:
await self._instrument_library_dialog.cleanup()
if self.__engine_state_listener is not None:
self.__engine_state_listener.remove()
self.__engine_state_listener = None
self.hide()
while self._project_tabs.count() > 0:
@ -342,9 +353,9 @@ class EditorWindow(ui_base.AbstractEditorWindow):
def createStatusBar(self) -> None:
self.statusbar = QtWidgets.QStatusBar()
# self.pipeline_load = LoadHistoryWidget(100, 30)
# self.pipeline_load.setToolTip("Load of the playback engine.")
# self.statusbar.addPermanentWidget(self.pipeline_load)
self.pipeline_load = load_history.LoadHistoryWidget(100, 30)
self.pipeline_load.setToolTip("Load of the playback engine.")
self.statusbar.addPermanentWidget(self.pipeline_load)
self.pipeline_status = QtWidgets.QLabel()
self.statusbar.addPermanentWidget(self.pipeline_status)
@ -358,6 +369,28 @@ class EditorWindow(ui_base.AbstractEditorWindow):
self._settings_dialog.storeState()
def __engineStateChanged(self, engine_state: audioproc.EngineStateChange):
show_status, show_load = False, False
if engine_state.state == audioproc.EngineStateChange.SETUP:
self.pipeline_status.setText("Starting engine...")
show_status = True
elif engine_state.state == audioproc.EngineStateChange.CLEANUP:
self.pipeline_status.setText("Stopping engine...")
show_status = True
elif engine_state.state == audioproc.EngineStateChange.RUNNING:
if engine_state.HasField('load'):
self.pipeline_load.addValue(engine_state.load)
show_load = True
else:
self.pipeline_status.setText("Engine running")
show_status = True
elif engine_state.state == audioproc.EngineStateChange.STOPPED:
self.pipeline_status.setText("Engine stopped")
show_status = True
self.pipeline_status.setVisible(show_status)
self.pipeline_load.setVisible(show_load)
def setInfoMessage(self, msg: str) -> None:
self.statusbar.showMessage(msg)

@ -30,6 +30,7 @@ from PyQt5 import QtGui
from PyQt5 import QtSvg
from PyQt5 import QtWidgets
from noisicaa import audioproc
from noisicaa import model
from noisicaa import music
from noisicaa import node_db
@ -398,6 +399,8 @@ class Node(ui_base.ProjectMixin, QtWidgets.QGraphicsItem):
self.props = NodeProps()
self.__listeners = [] # type: List[core.Listener]
self.__node = node
self.__plugin_ui = None # type: Optional[plugin_ui.PluginUI]
@ -450,11 +453,20 @@ class Node(ui_base.ProjectMixin, QtWidgets.QGraphicsItem):
self.__drag_rect = QtCore.QRectF()
self.__name_listener = self.__node.name_changed.add(self.__nameChanged)
self.__graph_pos_listener = self.__node.graph_pos_changed.add(self.__graphRectChanged)
self.__graph_size_listener = self.__node.graph_size_changed.add(self.__graphRectChanged)
self.__graph_color_listener = self.__node.graph_color_changed.add(
lambda *_: self.__updateState())
self.__listeners.append(
self.__node.name_changed.add(self.__nameChanged))
self.__listeners.append(
self.__node.graph_pos_changed.add(self.__graphRectChanged))
self.__listeners.append(
self.__node.graph_size_changed.add(self.__graphRectChanged))
self.__listeners.append(
self.__node.graph_color_changed.add(lambda *_: self.__updateState()))
self.__state = None # type: audioproc.NodeStateChange.State
self.__listeners.append(
self.audioproc_client.node_state_changed.add(
'%08x' % self.__node.id, self.__stateChanged))
self.__updateState()
@ -478,7 +490,10 @@ class Node(ui_base.ProjectMixin, QtWidgets.QGraphicsItem):
port.setup()
def cleanup(self) -> None:
self.__graph_pos_listener.remove()
for listener in self.__listeners:
listener.remove()
self.__listeners.clear()
for port in self.__ports.values():
port.cleanup()
self.__ports.clear()
@ -577,6 +592,11 @@ class Node(ui_base.ProjectMixin, QtWidgets.QGraphicsItem):
def boundingRect(self) -> QtCore.QRectF:
return self.__box.boundingRect()
def __stateChanged(self, state_change: audioproc.NodeStateChange) -> None:
if state_change.HasField('state'):
self.__state = state_change.state
self.__updateState()
def __updateState(self) -> None:
if self.__selected or self.__hovered:
opacity = 1.0
@ -588,7 +608,14 @@ class Node(ui_base.ProjectMixin, QtWidgets.QGraphicsItem):
if not port.highlighted():