Use proto messages for IPCs.

- Mostly internal refactoring without user visible changes.
- Optimize the IPC code a bit, which makes it a bit faster.
- Shutting down is a bit more reliable.
looper
Ben Niemann 4 years ago
parent 028c738206
commit da16c0626a

@ -29,6 +29,8 @@ add_python_package(
utils.py
)
py_proto(editor_main.proto)
add_subdirectory(audioproc)
add_subdirectory(bindings)
add_subdirectory(core)

@ -22,8 +22,9 @@ add_python_package(
audioproc_client.py
audioproc_client_test.py
audioproc_process.py
mutations.py
)
py_proto(audioproc.proto)
add_subdirectory(engine)
add_subdirectory(public)

@ -18,11 +18,9 @@
#
# @end:license
from .audioproc_client import (
AudioProcClientBase,
AudioProcClientMixin,
)
from .mutations import (
from .audioproc_pb2 import (
ControlValueChange,
PluginStateChange,
Mutation,
AddNode,
RemoveNode,
@ -31,6 +29,10 @@ from .mutations import (
SetControlValue,
SetPluginState,
)
from .audioproc_client import (
AbstractAudioProcClient,
AudioProcClient,
)
from .public import (
NodeStateChange,
EngineStateChange,
@ -49,4 +51,8 @@ from .public import (
TimeMapper,
DeviceDescription,
DevicePortDescription,
ControlValue,
ProjectProperties,
BackendSettings,
HostParameters,
)

@ -0,0 +1,159 @@
/*
* @begin:license
*
* Copyright (c) 2015-2019, Benjamin Niemann <pink@odahoda.de>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* @end:license
*/
syntax = "proto2";
import "noisicaa/core/session_data.proto";
import "noisicaa/node_db/node_description.proto";
import "noisicaa/audioproc/public/backend_settings.proto";
import "noisicaa/audioproc/public/control_value.proto";
import "noisicaa/audioproc/public/host_parameters.proto";
import "noisicaa/audioproc/public/player_state.proto";
import "noisicaa/audioproc/public/plugin_state.proto";
import "noisicaa/audioproc/public/processor_message.proto";
import "noisicaa/audioproc/public/project_properties.proto";
package noisicaa.audioproc.pb;
message AddNode {
required string id = 1;
optional string name = 2;
required noisicaa.pb.NodeDescription description = 3;
optional noisicaa.pb.PluginState initial_state = 4;
optional string child_realm = 5;
}
message RemoveNode {
required string id = 1;
}
message ConnectPorts {
required string src_node_id = 1;
required string src_port = 2;
required string dest_node_id = 3;
required string dest_port = 4;
}
message DisconnectPorts {
required string src_node_id = 1;
required string src_port = 2;
required string dest_node_id = 3;
required string dest_port = 4;
}
message SetControlValue {
required string name = 1;
required float value = 2;
required uint32 generation = 3;
}
message SetPluginState {
required string node_id = 1;
required noisicaa.pb.PluginState state = 2;
}
message Mutation {
oneof type {
AddNode add_node = 1;
RemoveNode remove_node = 2;
ConnectPorts connect_ports = 3;
DisconnectPorts disconnect_ports = 4;
SetControlValue set_control_value = 5;
SetPluginState set_plugin_state = 6;
}
}
message PluginStateChange {
required string realm = 1;
required string node_id = 2;
required noisicaa.pb.PluginState state = 3;
}
message ControlValueChange {
required string realm = 1;
required string node_id = 2;
required noisicaa.pb.ControlValue value = 3;
}
message CreateRealmRequest {
required string name = 1;
optional string parent = 2;
optional bool enable_player = 3;
optional string callback_address = 4;
}
message DeleteRealmRequest {
required string name = 1;
}
message SendNodeMessagesRequest {
required string realm = 1;
repeated noisicaa.pb.ProcessorMessage messages = 2;
}
message CreatePluginUIRequest {
required string realm = 1;
required string node_id = 2;
}
message CreatePluginUIResponse {
required uint32 wid = 1;
required uint32 width = 2;
required uint32 height = 3;
}
message DeletePluginUIRequest {
required string realm = 1;
required string node_id = 2;
}
message PlayFileRequest {
required string path = 1;
}
message ProfileAudioThreadRequest {
required uint32 duration = 1;
}
message ProfileAudioThreadResponse {
required bytes svg = 1;
}
message SetBackendRequest {
required string name = 1;
optional noisicaa.pb.BackendSettings settings = 2;
}
message UpdateProjectPropertiesRequest {
required string realm = 1;
required noisicaa.pb.ProjectProperties properties = 2;
}
message PipelineMutationRequest {
required string realm = 1;
required Mutation mutation = 2;
}
message SetSessionValuesRequest {
required string realm = 1;
repeated noisicaa.pb.SessionValue session_values = 2;
}

@ -22,24 +22,27 @@
import asyncio
import logging
from typing import Any, Optional, Set, Tuple, Dict
import random
from typing import Any, Optional, Iterable, Set, Tuple
from noisicaa import core
from noisicaa.core import empty_message_pb2
from noisicaa.core import session_data_pb2
from noisicaa.core import ipc
from noisicaa import node_db
from .public import engine_notification_pb2
from .public import player_state_pb2
from .public import processor_message_pb2
from . import mutations
from .public import host_parameters_pb2
from .public import backend_settings_pb2
from .public import project_properties_pb2
from . import audioproc_pb2
logger = logging.getLogger(__name__)
class AudioProcClientBase(object):
def __init__(self, event_loop: asyncio.AbstractEventLoop, server: ipc.Server) -> None:
self.event_loop = event_loop
self.server = server
class AbstractAudioProcClient(object):
def __init__(self) -> None:
self.engine_notifications = None # type: core.Callback[engine_notification_pb2.EngineNotification]
self.engine_state_changed = None # type: core.Callback[engine_notification_pb2.EngineStateChange]
self.player_state_changed = None # type: core.CallbackMap[str, player_state_pb2.PlayerState]
@ -52,21 +55,15 @@ class AudioProcClientBase(object):
raise NotImplementedError
async def setup(self) -> None:
raise NotImplementedError
pass
async def cleanup(self) -> None:
raise NotImplementedError
pass
async def connect(self, address: str, flags: Optional[Set[str]] = None) -> None:
raise NotImplementedError
async def disconnect(self, shutdown: bool = False) -> None:
raise NotImplementedError
async def shutdown(self) -> None:
raise NotImplementedError
async def ping(self) -> None:
async def disconnect(self) -> None:
raise NotImplementedError
async def create_realm(
@ -97,7 +94,7 @@ class AudioProcClientBase(object):
async def set_control_value(self, realm: str, name: str, value: float, generation: int) -> None:
raise NotImplementedError
async def pipeline_mutation(self, realm: str, mutation: mutations.Mutation) -> None:
async def pipeline_mutation(self, realm: str, mutation: audioproc_pb2.Mutation) -> None:
raise NotImplementedError
async def create_plugin_ui(self, realm: str, node_id: str) -> Tuple[int, Tuple[int, int]]:
@ -110,16 +107,15 @@ class AudioProcClientBase(object):
self, realm: str, messages: processor_message_pb2.ProcessorMessageList) -> None:
raise NotImplementedError
async def set_host_parameters(self, **parameters: Any) -> None:
raise NotImplementedError
async def set_backend(self, name: str, **parameters: Any) -> None:
async def set_host_parameters(self, *, block_size: int = None, sample_rate: int = None) -> None:
raise NotImplementedError
async def set_backend_parameters(self, **parameters: Any) -> None:
async def set_backend(
self, name: str, settings: backend_settings_pb2.BackendSettings = None) -> None:
raise NotImplementedError
async def set_session_values(self, realm: str, values: Dict[str, Any]) -> None:
async def set_session_values(
self, realm: str, values: Iterable[session_data_pb2.SessionValue]) -> None:
raise NotImplementedError
async def update_player_state(self, state: player_state_pb2.PlayerState) -> None:
@ -128,18 +124,22 @@ class AudioProcClientBase(object):
async def play_file(self, path: str) -> None:
raise NotImplementedError
async def dump(self) -> None:
async def update_project_properties(
self, realm: str, properties: project_properties_pb2.ProjectProperties) -> None:
raise NotImplementedError
async def update_project_properties(self, realm: str, **kwargs: Any) -> None:
async def profile_audio_thread(self, duration: int) -> bytes:
raise NotImplementedError
class AudioProcClientMixin(AudioProcClientBase):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class AudioProcClient(AbstractAudioProcClient):
def __init__(self, event_loop: asyncio.AbstractEventLoop, server: ipc.Server) -> None:
super().__init__()
self.event_loop = event_loop
self.server = server
self._stub = None # type: ipc.Stub
self._session_id = None # type: str
self.engine_notifications = core.Callback[engine_notification_pb2.EngineNotification]()
self.engine_state_changed = core.Callback[engine_notification_pb2.EngineStateChange]()
@ -148,140 +148,208 @@ class AudioProcClientMixin(AudioProcClientBase):
self.node_messages = core.CallbackMap[str, bytes]()
self.perf_stats = core.Callback[core.PerfStats]()
self.__cb_endpoint_name = 'audioproc-%016x' % random.getrandbits(63)
self.__cb_endpoint_address = None # type: str
@property
def address(self) -> str:
return self._stub.server_address
async def setup(self) -> None:
await super().setup()
self.server.add_command_handler(
'ENGINE_NOTIFICATION', self.__handle_engine_notification, log_level=logging.DEBUG)
cb_endpoint = ipc.ServerEndpoint(self.__cb_endpoint_name)
cb_endpoint.add_handler(
'ENGINE_NOTIFICATION', self.__handle_engine_notification,
engine_notification_pb2.EngineNotification, empty_message_pb2.EmptyMessage)
self.__cb_endpoint_address = await self.server.add_endpoint(cb_endpoint)
async def cleanup(self) -> None:
await self.disconnect()
self.server.remove_command_handler('ENGINE_NOTIFICATION')
if self.__cb_endpoint_address is not None:
await self.server.remove_endpoint(self.__cb_endpoint_name)
self.__cb_endpoint_address = None
await super().cleanup()
async def connect(self, address: str, flags: Optional[Set[str]] = None) -> None:
assert self._stub is None
self._stub = ipc.Stub(self.event_loop, address)
await self._stub.connect()
self._session_id = await self._stub.call('START_SESSION', self.server.address, flags)
logger.info("Started session %s", self._session_id)
async def disconnect(self, shutdown: bool = False) -> None:
if self._session_id is not None:
try:
await self._stub.call('END_SESSION', self._session_id)
except ipc.ConnectionClosed:
logger.info("Connection already closed.")
self._session_id = None
await self._stub.connect(core.StartSessionRequest(
callback_address=self.__cb_endpoint_address,
flags=flags))
async def disconnect(self) -> None:
if self._stub is not None:
if shutdown:
try:
await self.shutdown()
except ipc.ConnectionClosed:
pass
await self._stub.close()
self._stub = None
async def __handle_engine_notification(
self, msg: engine_notification_pb2.EngineNotification) -> None:
self.engine_notifications.call(msg)
self,
request: engine_notification_pb2.EngineNotification,
response: empty_message_pb2.EmptyMessage
) -> None:
self.engine_notifications.call(request)
if msg.HasField('player_state'):
player_state = msg.player_state
if request.HasField('player_state'):
player_state = request.player_state
self.player_state_changed.call(player_state.realm, player_state)
for node_state_change in msg.node_state_changes:
for node_state_change in request.node_state_changes:
self.node_state_changed.call(node_state_change.node_id, node_state_change)
for node_message in msg.node_messages:
for node_message in request.node_messages:
self.node_messages.call(node_message.node_id, node_message.atom)
for engine_state_change in msg.engine_state_changes:
for engine_state_change in request.engine_state_changes:
self.engine_state_changed.call(engine_state_change)
if msg.HasField('perf_stats'):
if request.HasField('perf_stats'):
perf_stats = core.PerfStats()
perf_stats.deserialize(msg.perf_stats)
perf_stats.deserialize(request.perf_stats)
self.perf_stats.call(perf_stats)
async def shutdown(self) -> None:
await self._stub.call('SHUTDOWN')
async def ping(self) -> None:
await self._stub.ping()
async def create_realm(
self, *, name: str, parent: Optional[str] = None, enable_player: bool = False,
callback_address: Optional[str] = None) -> None:
await self._stub.call(
'CREATE_REALM', self._session_id, name, parent, enable_player, callback_address)
'CREATE_REALM',
audioproc_pb2.CreateRealmRequest(
name=name,
parent=parent,
enable_player=enable_player,
callback_address=callback_address))
async def delete_realm(self, name: str) -> None:
await self._stub.call('DELETE_REALM', self._session_id, name)
await self._stub.call(
'DELETE_REALM',
audioproc_pb2.DeleteRealmRequest(
name=name))
async def add_node(
self, realm: str, *, description: node_db.NodeDescription, **args: Any) -> None:
await self.pipeline_mutation(realm, mutations.AddNode(description=description, **args))
await self.pipeline_mutation(
realm,
audioproc_pb2.Mutation(
add_node=audioproc_pb2.AddNode(description=description, **args)))
async def remove_node(self, realm: str, node_id: str) -> None:
await self.pipeline_mutation(realm, mutations.RemoveNode(node_id))
await self.pipeline_mutation(
realm,
audioproc_pb2.Mutation(
remove_node=audioproc_pb2.RemoveNode(id=node_id)))
async def connect_ports(
self, realm: str, node1_id: str, port1_name: str, node2_id: str, port2_name: str
) -> None:
await self.pipeline_mutation(
realm, mutations.ConnectPorts(node1_id, port1_name, node2_id, port2_name))
realm,
audioproc_pb2.Mutation(
connect_ports=audioproc_pb2.ConnectPorts(
src_node_id=node1_id,
src_port=port1_name,
dest_node_id=node2_id,
dest_port=port2_name)))
async def disconnect_ports(
self, realm: str, node1_id: str, port1_name: str, node2_id: str, port2_name: str
) -> None:
await self.pipeline_mutation(
realm, mutations.DisconnectPorts(node1_id, port1_name, node2_id, port2_name))
realm,
audioproc_pb2.Mutation(
disconnect_ports=audioproc_pb2.DisconnectPorts(
src_node_id=node1_id,
src_port=port1_name,
dest_node_id=node2_id,
dest_port=port2_name)))
async def set_control_value(self, realm: str, name: str, value: float, generation: int) -> None:
await self.pipeline_mutation(realm, mutations.SetControlValue(name, value, generation))
async def pipeline_mutation(self, realm: str, mutation: mutations.Mutation) -> None:
await self._stub.call('PIPELINE_MUTATION', self._session_id, realm, mutation)
await self.pipeline_mutation(
realm,
audioproc_pb2.Mutation(
set_control_value=audioproc_pb2.SetControlValue(
name=name,
value=value,
generation=generation)))
async def pipeline_mutation(self, realm: str, mutation: audioproc_pb2.Mutation) -> None:
await self._stub.call(
'PIPELINE_MUTATION',
audioproc_pb2.PipelineMutationRequest(
realm=realm,
mutation=mutation))
async def create_plugin_ui(self, realm: str, node_id: str) -> Tuple[int, Tuple[int, int]]:
return await self._stub.call('CREATE_PLUGIN_UI', self._session_id, realm, node_id)
request = audioproc_pb2.CreatePluginUIRequest(
realm=realm,
node_id=node_id)
response = audioproc_pb2.CreatePluginUIResponse()
await self._stub.call('CREATE_PLUGIN_UI', request, response)
return (response.wid, (response.width, response.height))
async def delete_plugin_ui(self, realm: str, node_id: str) -> None:
await self._stub.call('DELETE_PLUGIN_UI', self._session_id, realm, node_id)
await self._stub.call(
'DELETE_PLUGIN_UI',
audioproc_pb2.DeletePluginUIRequest(
realm=realm,
node_id=node_id))
async def send_node_messages(
self, realm: str, messages: processor_message_pb2.ProcessorMessageList) -> None:
await self._stub.call('SEND_NODE_MESSAGES', self._session_id, realm, messages)
async def set_host_parameters(self, **parameters: Any) -> None:
await self._stub.call('SET_HOST_PARAMETERS', self._session_id, parameters)
await self._stub.call(
'SEND_NODE_MESSAGES',
audioproc_pb2.SendNodeMessagesRequest(
realm=realm,
messages=messages.messages))
async def set_backend(self, name: str, **parameters: Any) -> None:
await self._stub.call('SET_BACKEND', self._session_id, name, parameters)
async def set_host_parameters(self, *, block_size: int = None, sample_rate: int = None) -> None:
await self._stub.call(
'SET_HOST_PARAMETERS',
host_parameters_pb2.HostParameters(
block_size=block_size,
sample_rate=sample_rate))
async def set_backend_parameters(self, **parameters: Any) -> None:
await self._stub.call('SET_BACKEND_PARAMETERS', self._session_id, parameters)
async def set_backend(
self, name: str, settings: backend_settings_pb2.BackendSettings = None) -> None:
await self._stub.call(
'SET_BACKEND',
audioproc_pb2.SetBackendRequest(
name=name,
settings=settings))
async def set_session_values(self, realm: str, values: Dict[str, Any]) -> None:
await self._stub.call('SET_SESSION_VALUES', self._session_id, realm, values)
async def set_session_values(
self, realm: str, values: Iterable[session_data_pb2.SessionValue]) -> None:
await self._stub.call(
'SET_SESSION_VALUES',
audioproc_pb2.SetSessionValuesRequest(
realm=realm,
session_values=values))
async def update_player_state(self, state: player_state_pb2.PlayerState) -> None:
await self._stub.call('UPDATE_PLAYER_STATE', self._session_id, state)
await self._stub.call(
'UPDATE_PLAYER_STATE',
state)
async def play_file(self, path: str) -> None:
await self._stub.call('PLAY_FILE', self._session_id, path)
async def dump(self) -> None:
await self._stub.call('DUMP', self._session_id)
await self._stub.call(
'PLAY_FILE',
audioproc_pb2.PlayFileRequest(
path=path))
async def profile_audio_thread(self, duration: int) -> bytes:
return await self._stub.call('PROFILE_AUDIO_THREAD', self._session_id, duration)
async def update_project_properties(self, realm: str, **kwargs: Any) -> None:
return await self._stub.call('UPDATE_PROJECT_PROPERTIES', self._session_id, realm, kwargs)
request = audioproc_pb2.ProfileAudioThreadRequest(
duration=duration)
response = audioproc_pb2.ProfileAudioThreadResponse()
await self._stub.call('PROFILE_AUDIO_THREAD', request, response)
return response.svg
async def update_project_properties(
self, realm: str, properties: project_properties_pb2.ProjectProperties) -> None:
await self._stub.call(
'UPDATE_PROJECT_PROPERTIES',
audioproc_pb2.UpdateProjectPropertiesRequest(
realm=realm,
properties=properties))

@ -28,8 +28,6 @@ import async_generator
from noisidev import unittest
from noisidev import unittest_mixins
from noisicaa import node_db
from noisicaa.constants import TEST_OPTS
from noisicaa.core import ipc
from . import audioproc_client
from .public import engine_notification_pb2
@ -37,22 +35,8 @@ from .public import engine_notification_pb2
logger = logging.getLogger(__name__)
class TestClientImpl(audioproc_client.AudioProcClientBase): # pylint: disable=abstract-method
def __init__(self, event_loop):
super().__init__(event_loop, ipc.Server(event_loop, 'client', TEST_OPTS.TMP_DIR))
async def setup(self):
await self.server.setup()
async def cleanup(self):
await self.server.cleanup()
class TestClient(audioproc_client.AudioProcClientMixin, TestClientImpl):
pass
class AudioProcClientTest(
unittest_mixins.ServerMixin,
unittest_mixins.NodeDBMixin,
unittest_mixins.ProcessManagerMixin,
unittest.AsyncTestCase):
@ -60,6 +44,7 @@ class AudioProcClientTest(
super().__init__(*args, **kwargs)
self.passthru_description = node_db.NodeDescription(
uri='test://passthru',
type=node_db.NodeDescription.PROCESSOR,
ports=[
node_db.PortDescription(
@ -103,7 +88,7 @@ class AudioProcClientTest(
name='audioproc',
entry='noisicaa.audioproc.audioproc_process.AudioProcSubprocess')
client = TestClient(self.loop)
client = audioproc_client.AudioProcClient(self.loop, self.server)
await client.setup()
await client.connect(proc.address)
try:
@ -111,10 +96,10 @@ class AudioProcClientTest(
await async_generator.yield_(client)
finally:
await client.disconnect(shutdown=True)
await client.disconnect()
await client.cleanup()
await proc.wait()
await proc.shutdown()
async def test_realms(self):
async with self.create_process(inline_plugin_host=False) as client:

@ -27,31 +27,39 @@ import subprocess
import sys
import time
import uuid
from typing import cast, Any, Optional, Dict, Set, Tuple
from typing import Any, Optional, Dict, Set
import posix_ipc
from noisicaa import core
from noisicaa.core import empty_message_pb2
from noisicaa.core import ipc
from noisicaa import node_db
from noisicaa import lv2
from noisicaa import host_system
from noisicaa import editor_main_pb2
from .public import engine_notification_pb2
from .public import host_parameters_pb2
from .public import player_state_pb2
from .public import processor_message_pb2
from .engine import profile
from . import engine
from . import mutations
from . import audioproc_pb2
logger = logging.getLogger(__name__)
class Session(core.CallbackSessionMixin, core.SessionBase):
class Session(ipc.CallbackSessionMixin, ipc.Session):
async_connect = False
def __init__(self, client_address: str, flags: Set, **kwargs: Any) -> None:
super().__init__(callback_address=client_address, **kwargs)
def __init__(
self,
session_id: int,
start_session_request: core.StartSessionRequest,
event_loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(session_id, start_session_request, event_loop)
self.__flags = flags or set()
self.__flags = set(start_session_request.flags)
self.owned_realms = set() # type: Set[str]
self.__shutdown = False
@ -63,8 +71,8 @@ class Session(core.CallbackSessionMixin, core.SessionBase):
await super().setup()
self.__shutdown = False
self.__notification_available = asyncio.Event(loop=self.event_loop)
self.__notification_pusher_task = self.event_loop.create_task(
self.__notification_available = asyncio.Event(loop=self._event_loop)
self.__notification_pusher_task = self._event_loop.create_task(
self.__notification_pusher())
async def cleanup(self) -> None:
@ -95,7 +103,7 @@ class Session(core.CallbackSessionMixin, core.SessionBase):
delay = next_notification - time.time()
if delay > 0:
await asyncio.sleep(delay, loop=self.event_loop)
await asyncio.sleep(delay, loop=self._event_loop)
def callback_connected(self) -> None:
pass
@ -106,9 +114,7 @@ class Session(core.CallbackSessionMixin, core.SessionBase):
self.__notification_available.set()
class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
session_cls = Session
class AudioProcProcess(core.ProcessBase):
def __init__(
self, *,
shm: Optional[str] = None,
@ -118,6 +124,8 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
super().__init__(**kwargs)
self.shm_name = shm
self.shm = None # type: Optional[posix_ipc.SharedMemory]
self.__main_endpoint = None # type: ipc.ServerEndpointWithSessions[Session]
self.__urid_mapper = None # type: lv2.ProxyURIDMapper
self.__block_size = block_size
self.__sample_rate = sample_rate
@ -127,29 +135,55 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
async def setup(self) -> None:
await super().setup()
self.server.add_command_handler('SHUTDOWN', self.shutdown)
self.server.add_command_handler('CREATE_REALM', self.__handle_create_realm)
self.server.add_command_handler('DELETE_REALM', self.__handle_delete_realm)
self.server.add_command_handler('SET_HOST_PARAMETERS', self.handle_set_host_parameters)
self.server.add_command_handler('SET_BACKEND', self.handle_set_backend)
self.server.add_command_handler(
'SET_BACKEND_PARAMETERS', self.handle_set_backend_parameters)
self.server.add_command_handler('SET_SESSION_VALUES', self.handle_set_session_values)
self.server.add_command_handler('PLAY_FILE', self.handle_play_file)
self.server.add_command_handler('PIPELINE_MUTATION', self.handle_pipeline_mutation)
self.server.add_command_handler('SEND_NODE_MESSAGES', self.handle_send_node_messages)
self.server.add_command_handler('UPDATE_PLAYER_STATE', self.handle_update_player_state)
self.server.add_command_handler(
'UPDATE_PROJECT_PROPERTIES', self.handle_update_project_properties)
self.server.add_command_handler('CREATE_PLUGIN_UI', self.handle_create_plugin_ui)
self.server.add_command_handler('DELETE_PLUGIN_UI', self.handle_delete_plugin_ui)
self.server.add_command_handler('DUMP', self.handle_dump)
self.server.add_command_handler('PROFILE_AUDIO_THREAD', self.handle_profile_audio_thread)
self.__main_endpoint = ipc.ServerEndpointWithSessions('main', Session)
self.__main_endpoint.add_handler(
'CREATE_REALM', self.__handle_create_realm,
audioproc_pb2.CreateRealmRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'DELETE_REALM', self.__handle_delete_realm,
audioproc_pb2.DeleteRealmRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'SET_HOST_PARAMETERS', self.__handle_set_host_parameters,
host_parameters_pb2.HostParameters, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'SET_BACKEND', self.__handle_set_backend,
audioproc_pb2.SetBackendRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'SET_SESSION_VALUES', self.__handle_set_session_values,
audioproc_pb2.SetSessionValuesRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'PLAY_FILE', self.__handle_play_file,
audioproc_pb2.PlayFileRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'PIPELINE_MUTATION', self.__handle_pipeline_mutation,
audioproc_pb2.PipelineMutationRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'SEND_NODE_MESSAGES', self.__handle_send_node_messages,
audioproc_pb2.SendNodeMessagesRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'UPDATE_PLAYER_STATE', self.__handle_update_player_state,
player_state_pb2.PlayerState, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'UPDATE_PROJECT_PROPERTIES', self.__handle_update_project_properties,
audioproc_pb2.UpdateProjectPropertiesRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'CREATE_PLUGIN_UI', self.__handle_create_plugin_ui,
audioproc_pb2.CreatePluginUIRequest, audioproc_pb2.CreatePluginUIResponse)
self.__main_endpoint.add_handler(
'DELETE_PLUGIN_UI', self.__handle_delete_plugin_ui,
audioproc_pb2.DeletePluginUIRequest, empty_message_pb2.EmptyMessage)
self.__main_endpoint.add_handler(
'PROFILE_AUDIO_THREAD', self.__handle_profile_audio_thread,
audioproc_pb2.ProfileAudioThreadRequest, audioproc_pb2.ProfileAudioThreadResponse)
await self.server.add_endpoint(self.__main_endpoint)
if self.shm_name is not None:
self.shm = posix_ipc.SharedMemory(self.shm_name)
urid_mapper_address = await self.manager.call('CREATE_URID_MAPPER_PROCESS')
create_urid_mapper_response = editor_main_pb2.CreateProcessResponse()
await self.manager.call(
'CREATE_URID_MAPPER_PROCESS', None, create_urid_mapper_response)
urid_mapper_address = create_urid_mapper_response.address
self.__urid_mapper = lv2.ProxyURIDMapper(
server_address=urid_mapper_address,
@ -200,133 +234,180 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
await super().cleanup()
def __handle_engine_notification(self, msg: engine_notification_pb2.EngineNotification) -> None:
for session in self.sessions:
cast(Session, session).publish_engine_notification(msg)
for session in self.__main_endpoint.sessions:
session.publish_engine_notification(msg)
async def __handle_create_realm(
self, session_id: str, name: str, parent: str, enable_player: bool,
callback_address: str
self,
session: Session,
request: audioproc_pb2.CreateRealmRequest,
response: empty_message_pb2.EmptyMessage,
) -> None:
session = cast(Session, self.get_session(session_id))
await self.__engine.create_realm(
name=name,
parent=parent,
enable_player=enable_player,
callback_address=callback_address)
session.owned_realms.add(name)
async def __handle_delete_realm(self, session_id: str, name: str) -> None:
session = cast(Session, self.get_session(session_id))
assert name in session.owned_realms
await self.__engine.delete_realm(name)
session.owned_realms.remove(name)
async def handle_pipeline_mutation(
self, session_id: str, realm_name: str, mutation: mutations.Mutation) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm(realm_name)
name=request.name,
parent=request.parent if request.HasField('parent') else None,
enable_player=request.enable_player,
callback_address=(
request.callback_address if request.HasField('callback_address') else None))
session.owned_realms.add(request.name)
async def __handle_delete_realm(
self,
session: Session,
request: audioproc_pb2.DeleteRealmRequest,
response: empty_message_pb2.EmptyMessage,
) -> None:
assert request.name in session.owned_realms
await self.__engine.delete_realm(request.name)
session.owned_realms.remove(request.name)
async def __handle_pipeline_mutation(
self,
session: Session,
request: audioproc_pb2.PipelineMutationRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm(request.realm)
graph = realm.graph
if isinstance(mutation, mutations.AddNode):
logger.info("AddNode():\n%s", mutation.description)
mutation_type = request.mutation.WhichOneof('type')
if mutation_type == 'add_node':
add_node = request.mutation.add_node
logger.info("AddNode():\n%s", add_node.description)
kwargs = {} # type: Dict[str, Any]
if add_node.HasField('name'):
kwargs['name'] = add_node.name
if add_node.HasField('initial_state'):
kwargs['initial_state'] = add_node.initial_state
if add_node.HasField('child_realm'):
kwargs['child_realm'] = add_node.child_realm
node = engine.Node.create(
host_system=self.__host_system,
description=mutation.description,
**mutation.args)
id=add_node.id,
description=add_node.description,
**kwargs)
graph.add_node(node)
# TODO: schedule setup in a worker thread.
await realm.setup_node(node)
realm.update_spec()
elif isinstance(mutation, mutations.RemoveNode):
node = graph.find_node(mutation.node_id)
elif mutation_type == 'remove_node':
remove_node = request.mutation.remove_node
node = graph.find_node(remove_node.id)
await node.cleanup(deref=True)
graph.remove_node(node)
realm.update_spec()
elif isinstance(mutation, mutations.ConnectPorts):
node1 = graph.find_node(mutation.src_node)
elif mutation_type == 'connect_ports':
connect_ports = request.mutation.connect_ports
node1 = graph.find_node(connect_ports.src_node_id)
try:
port1 = node1.outputs[mutation.src_port]
port1 = node1.outputs[connect_ports.src_port]
except KeyError:
raise KeyError(
"Node %s (%s) has no port %s"
% (node1.id, type(node1).__name__, mutation.src_port)
% (node1.id, type(node1).__name__, connect_ports.src_port)
).with_traceback(sys.exc_info()[2]) from None
node2 = graph.find_node(mutation.dest_node)
node2 = graph.find_node(connect_ports.dest_node_id)
try:
port2 = node2.inputs[mutation.dest_port]
port2 = node2.inputs[connect_ports.dest_port]
except KeyError:
raise KeyError(
"Node %s (%s) has no port %s"
% (node2.id, type(node2).__name__, mutation.dest_port)
% (node2.id, type(node2).__name__, connect_ports.dest_port)
).with_traceback(sys.exc_info()[2]) from None
port2.connect(port1)
realm.update_spec()
elif isinstance(mutation, mutations.DisconnectPorts):
node1 = graph.find_node(mutation.src_node)
node2 = graph.find_node(mutation.dest_node)
node2.inputs[mutation.dest_port].disconnect(node1.outputs[mutation.src_port])
elif mutation_type == 'disconnect_ports':
disconnect_ports = request.mutation.disconnect_ports
node1 = graph.find_node(disconnect_ports.src_node_id)
node2 = graph.find_node(disconnect_ports.dest_node_id)
node2.inputs[disconnect_ports.dest_port].disconnect(
node1.outputs[disconnect_ports.src_port])
realm.update_spec()
elif isinstance(mutation, mutations.SetControlValue):
realm.set_control_value(mutation.name, mutation.value, mutation.generation)
elif mutation_type == 'set_control_value':
set_control_value = request.mutation.set_control_value
realm.set_control_value(
set_control_value.name,
set_control_value.value,
set_control_value.generation)
elif isinstance(mutation, mutations.SetPluginState):
await realm.set_plugin_state(mutation.node, mutation.state)
elif mutation_type == 'set_plugin_state':
set_plugin_state = request.mutation.set_plugin_state
await realm.set_plugin_state(
set_plugin_state.node_id,
set_plugin_state.state)
else:
raise ValueError(type(mutation))
def handle_send_node_messages(
self, session_id: str, realm_name: str,
messages: processor_message_pb2.ProcessorMessageList) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm(realm_name)
for msg in messages.messages:
realm.send_node_message(msg)
raise ValueError(request.mutation)
async def handle_set_host_parameters(self, session_id: str, parameters: Any) -> None:
self.get_session(session_id)
await self.__engine.set_host_parameters(**parameters)
async def handle_set_backend(
self, session_id: str, name: str, parameters: Dict[str, Any]) -> None:
self.get_session(session_id)
await self.__engine.set_backend(name, **parameters)
def handle_set_backend_parameters(self, session_id: str, parameters: Dict[str, Any]) -> None:
self.get_session(session_id)
self.__engine.set_backend_parameters(**parameters)
def handle_set_session_values(
self, session_id: str, realm_name: str, values: Dict[str, Any]) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm(realm_name)
realm.set_session_values(values)
def handle_update_player_state(
self, session_id: str, state: player_state_pb2.PlayerState) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm(state.realm)
realm.player.update_state(state)
def __handle_send_node_messages(
self,
session: Session,
request: audioproc_pb2.SendNodeMessagesRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm(request.realm)
for msg in request.messages:
realm.send_node_message(msg)
def handle_update_project_properties(
self, session_id: str, realm_name: str, properties: Dict[str, Any]) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm(realm_name)
realm.update_project_properties(**properties)
async def __handle_set_host_parameters(
self,
session: Session,
request: host_parameters_pb2.HostParameters,
response: empty_message_pb2.EmptyMessage
) -> None:
await self.__engine.set_host_parameters(request)
async def handle_play_file(self, session_id: str, path: str) -> None:
self.get_session(session_id)
async def __handle_set_backend(
self,
session: Session,
request: audioproc_pb2.SetBackendRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
await self.__engine.set_backend(request.name, request.settings)
def __handle_set_session_values(
self,
session: Session,
request: audioproc_pb2.SetSessionValuesRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm(request.realm)
realm.set_session_values(request.session_values)
def __handle_update_player_state(
self,
session: Session,
request: player_state_pb2.PlayerState,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm(request.realm)
realm.player.update_state(request)
def __handle_update_project_properties(
self,
session: Session,
request: audioproc_pb2.UpdateProjectPropertiesRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm(request.realm)
realm.update_project_properties(request.properties)
async def __handle_play_file(
self,
session: Session,
request: audioproc_pb2.PlayFileRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
realm = self.__engine.get_realm('root')
node_desc = node_db.NodeDescription()
node_desc.CopyFrom(node_db.Builtins.SoundFileDescription)
node_desc.sound_file.sound_file_path = path
node_desc.sound_file.sound_file_path = request.path
node = engine.Node.create(
host_system=self.__host_system,
@ -361,25 +442,35 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
realm.graph.remove_node(node)
realm.update_spec()
async def handle_create_plugin_ui(
self, session_id: str, realm_name: str, node_id: str) -> Tuple[int, Tuple[int, int]]:
self.get_session(session_id)
return await self.__engine.create_plugin_ui(realm_name, node_id)
async def handle_delete_plugin_ui(self, session_id: str, realm_name: str, node_id: str) -> None:
self.get_session(session_id)
return await self.__engine.delete_plugin_ui(realm_name, node_id)
def handle_dump(self, session_id: str) -> None:
self.__engine.dump()
async def handle_profile_audio_thread(self, session_id: str, duration: int) -> bytes:
self.get_session(session_id)
async def __handle_create_plugin_ui(
self,
session: Session,
request: audioproc_pb2.CreatePluginUIRequest,
response: audioproc_pb2.CreatePluginUIResponse,
) -> None:
wid, (width, height) = await self.__engine.create_plugin_ui(request.realm, request.node_id)
response.wid = wid
response.width = width
response.height = height
async def __handle_delete_plugin_ui(
self,
session: Session,
request: audioproc_pb2.DeletePluginUIRequest,
response: empty_message_pb2.EmptyMessage
) -> None:
await self.__engine.delete_plugin_ui(request.realm, request.node_id)
async def __handle_profile_audio_thread(
self,
session: Session,
request: audioproc_pb2.ProfileAudioThreadRequest,
response: audioproc_pb2.ProfileAudioThreadResponse,
) -> None:
path = '/tmp/audio.prof'
logger.warning("Starting profile of the audio thread...")
profile.start(path)
await asyncio.sleep(duration, loop=self.event_loop)
await asyncio.sleep(request.duration, loop=self.event_loop)
profile.stop()
logger.warning("Audio thread profile complete. Data written to '%s'.", path)
@ -405,7 +496,7 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
raise RuntimeError(
"Command '%s' failed with return code %d" % (' '.join(argv), proc.returncode))
return svg
response.svg = svg
class AudioProcSubprocess(core.SubprocessMixin, AudioProcProcess):

@ -116,6 +116,7 @@ StatusOr<pb::DeviceDescription> ALSADeviceManager::get_device_description(int cl
pb::DevicePortDescription* port = device.add_ports();
int port_id = snd_seq_port_info_get_port(pinfo);
port->set_uri(sprintf("alsa://%d/%d", client_id, port_id));
port->set_type(pb::DevicePortDescription::MIDI);
port->set_display_name(snd_seq_port_info_get_name(pinfo));
if (cap & (SND_SEQ_PORT_CAP_READ | SND_SEQ_PORT_CAP_DUPLEX)) {

@ -29,7 +29,7 @@
namespace noisicaa {
Backend::Backend(
HostSystem* host_system, const char* logger_name, const BackendSettings& settings,
HostSystem* host_system, const char* logger_name, const pb::BackendSettings& settings,
void (*callback)(void*, const string&), void *userdata)
: _host_system(host_system),
_logger(LoggerRegistry::get_logger(logger_name)),
@ -44,8 +44,11 @@ Backend::~Backend() {
}
StatusOr<Backend*> Backend::create(
HostSystem* host_system, const string& name, const BackendSettings& settings,
HostSystem* host_system, const string& name, const string& serialized_settings,
void (*callback)(void*, const string&), void* userdata) {
pb::BackendSettings settings;
assert(settings.ParseFromString(serialized_settings));
if (name == "portaudio") {
return new PortAudioBackend(host_system, settings, callback, userdata);
} else if (name == "null") {

@ -29,6 +29,7 @@
#include "noisicaa/core/logging.h"
#include "noisicaa/core/slots.h"
#include "noisicaa/core/status.h"
#include "noisicaa/audioproc/public/backend_settings.pb.h"
#include "noisicaa/audioproc/engine/buffers.h"
namespace noisicaa {
@ -42,11 +43,6 @@ namespace pb {
class EngineNotification;
}
struct BackendSettings {
string datastream_address;
float time_scale;
};
class Backend {
public:
enum Channel {
@ -60,7 +56,7 @@ public:
Slot<pb::EngineNotification> notifications;
static StatusOr<Backend*> create(
HostSystem* host_system, const string& name, const BackendSettings& settings,
HostSystem* host_system, const string& name, const string& settings,
void (*callback)(void*, const string&), void* userdata);
virtual Status setup(Realm* realm);
@ -72,14 +68,14 @@ public:
protected:
Backend(
HostSystem* host_system, const char* logger_name, const BackendSettings& settings,
HostSystem* host_system, const char* logger_name, const pb::BackendSettings& settings,
void (*callback)(void*, const string&), void* userdata);
void notification_proxy(const pb::EngineNotification& notification);
HostSystem* _host_system;
Logger* _logger;
BackendSettings _settings;
pb::BackendSettings _settings;
void (*_callback)(void*, const string&);
void *_userdata;
Realm* _realm = nullptr;

@ -30,10 +30,6 @@ from .block_context cimport BlockContext
cdef extern from "noisicaa/audioproc/engine/backend.h" namespace "noisicaa" nogil:
struct BackendSettings:
string datastream_address
float time_scale
cppclass Backend:
enum Channel:
AUDIO_LEFT "noisicaa::Backend::AUDIO_LEFT"
@ -42,7 +38,7 @@ cdef extern from "noisicaa/audioproc/engine/backend.h" namespace "noisicaa" nogi
@staticmethod
StatusOr[Backend*] create(
HostSystem* host_system, const string& name, const BackendSettings& settings,
HostSystem* host_system, const string& name, const string& settings,
void (*callback)(void*, const string&), void* userdata)
Status setup(Realm* realm)
@ -52,12 +48,6 @@ cdef extern from "noisicaa/audioproc/engine/backend.h" namespace "noisicaa" nogi
Status output(BlockContext* ctxt, Channel channel, BufferPtr samples)
cdef class PyBackendSettings(object):
cdef BackendSettings __settings
cdef BackendSettings get(self)
cdef class PyBackend(object):
cdef unique_ptr[Backend] __backend_ptr
cdef Backend* __backend

@ -21,24 +21,16 @@
from typing import List, Optional, Union
from noisicaa import host_system as host_system_lib
from noisicaa.audioproc.public import backend_settings_pb2
from . import block_context
from . import buffers
from . import realm as realm_lib
class PyBackendSettings(object):
datastream_address = ... # type: str
time_scale = ... # type: float
def __init__(
self, *, datastream_address: Optional[float] = None, time_scale: Optional[float] = None
) -> None: ...
class PyBackend(object):
def __init__(
self, host_system: host_system_lib.HostSystem, name: Union[str, bytes],
settings: PyBackendSettings) -> None: ...
settings: backend_settings_pb2.BackendSettings) -> None: ...
def setup(self, realm: realm_lib.PyRealm) -> None: ...
def cleanup(self) -> None: ...
def stop(self) -> None: .