Move NodeDB into separate package and process - UI and project processes use a single shared DB process.
parent
07408041cf
commit
8bc4dfb7d7
@ -1 +1,20 @@
|
||||
from .client import NodeDBClient
|
||||
from .client import NodeDBClientMixin
|
||||
from .node_description import (
|
||||
SystemNodeDescription,
|
||||
UserNodeDescription,
|
||||
|
||||
AudioPortDescription,
|
||||
EventPortDescription,
|
||||
PortDirection, PortType,
|
||||
|
||||
ParameterType,
|
||||
InternalParameterDescription,
|
||||
StringParameterDescription,
|
||||
PathParameterDescription,
|
||||
TextParameterDescription,
|
||||
FloatParameterDescription,
|
||||
)
|
||||
from .mutations import (
|
||||
AddNodeDescription,
|
||||
RemoveNodeDescription,
|
||||
)
|
||||
|
@ -1,5 +1,66 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import logging
|
||||
|
||||
class NodeDBClient(object):
|
||||
pass
|
||||
from noisicaa import core
|
||||
from noisicaa.core import ipc
|
||||
from . import mutations
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NodeDBClientMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._stub = None
|
||||
self._session_id = None
|
||||
self._nodes = {}
|
||||
self.listeners = core.CallbackRegistry()
|
||||
|
||||
@property
|
||||
def nodes(self):
|
||||
return sorted(
|
||||
self._nodes.items(), key=lambda i: i[1].display_name)
|
||||
|
||||
def get_node_description(self, uri):
|
||||
return self._nodes[uri]
|
||||
|
||||
async def setup(self):
|
||||
await super().setup()
|
||||
self.server.add_command_handler(
|
||||
'NODEDB_MUTATION', self.handle_mutation)
|
||||
|
||||
async def connect(self, address, flags=None):
|
||||
assert self._stub is None
|
||||
self._stub = ipc.Stub(self.event_loop, address)
|
||||
await self._stub.connect()
|
||||
self._session_id = await self._stub.call(
|
||||
'START_SESSION', self.server.address, flags)
|
||||
|
||||
async def disconnect(self, shutdown=False):
|
||||
if self._session_id is not None:
|
||||
await self._stub.call('END_SESSION', self._session_id)
|
||||
self._session_id = None
|
||||
|
||||
if self._stub is not None:
|
||||
if shutdown:
|
||||
await self.shutdown()
|
||||
|
||||
await self._stub.close()
|
||||
self._stub = None
|
||||
|
||||
async def shutdown(self):
|
||||
await self._stub.call('SHUTDOWN')
|
||||
|
||||
async def start_scan(self):
|
||||
return await self._stub.call('START_SCAN', self._session_id)
|
||||
|
||||
def handle_mutation(self, mutation):
|
||||
logger.info("Mutation received: %s" % mutation)
|
||||
if isinstance(mutation, mutations.AddNodeDescription):
|
||||
assert mutation.uri not in self._nodes
|
||||
self._nodes[mutation.uri] = mutation.description
|
||||
else:
|
||||
raise ValueError(mutation)
|
||||
|
||||
self.listeners.call('mutation', mutation)
|
||||
|
@ -0,0 +1,73 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import asynctest
|
||||
|
||||
from noisicaa import core
|
||||
from noisicaa.core import ipc
|
||||
|
||||
from . import process
|
||||
from . import client
|
||||
|
||||
|
||||
class TestClientImpl(object):
|
||||
def __init__(self, event_loop):
|
||||
super().__init__()
|
||||
self.event_loop = event_loop
|
||||
self.server = ipc.Server(self.event_loop, 'client')
|
||||
|
||||
async def setup(self):
|
||||
await self.server.setup()
|
||||
|
||||
async def cleanup(self):
|
||||
await self.server.cleanup()
|
||||
|
||||
|
||||
class TestClient(client.NodeDBClientMixin, TestClientImpl):
|
||||
pass
|
||||
|
||||
|
||||
class TestProcessImpl(object):
|
||||
def __init__(self, event_loop):
|
||||
super().__init__()
|
||||
self.event_loop = event_loop
|
||||
self.server = ipc.Server(self.event_loop, 'audioproc')
|
||||
|
||||
async def setup(self):
|
||||
await self.server.setup()
|
||||
|
||||
async def cleanup(self):
|
||||
await self.server.cleanup()
|
||||
|
||||
|
||||
class TestProcess(process.NodeDBProcessMixin, TestProcessImpl):
|
||||
pass
|
||||
|
||||
|
||||
class NodeDBClientTest(asynctest.TestCase):
|
||||
async def setUp(self):
|
||||
self.process = TestProcess(self.loop)
|
||||
await self.process.setup()
|
||||
self.process_task = self.loop.create_task(
|
||||
self.process.run())
|
||||
|
||||
self.client = TestClient(self.loop)
|
||||
await self.client.setup()
|
||||
await self.client.connect(self.process.server.address)
|
||||
|
||||
async def tearDown(self):
|
||||
await self.client.disconnect(shutdown=True)
|
||||
await self.client.cleanup()
|
||||
await asyncio.wait_for(self.process_task, None)
|
||||
await self.process.cleanup()
|
||||
|
||||
async def test_start_scan(self):
|
||||
await self.client.start_scan()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
class Mutation(object):
|
||||
pass
|
||||
|
||||
|
||||
class AddNodeDescription(Mutation):
|
||||
def __init__(self, uri, description):
|
||||
self.uri = uri
|
||||
self.description = description
|
||||
|
||||
def __str__(self):
|
||||
return '<AddNodeDescription uri="%s">' % self.uri
|
||||
|
||||
|
||||
class RemoveNodeDescription(Mutation):
|
||||
def __init__(self, uri):
|
||||
self.uri = uri
|
||||
|
||||
def __str__(self):
|
||||
return '<RemoveNodeDescription uri="%s">' % self.uri
|
||||
|
||||
|
@ -0,0 +1,135 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from noisicaa import core
|
||||
from noisicaa.core import ipc
|
||||
|
||||
from .private import db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InvalidSessionError(Exception): pass
|
||||
|
||||
|
||||
class Session(object):
|
||||
def __init__(self, event_loop, callback_stub, flags):
|
||||
self.event_loop = event_loop
|
||||
self.callback_stub = callback_stub
|
||||
self.flags = flags or set()
|
||||
self.id = uuid.uuid4().hex
|
||||
self.pending_mutations = []
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
def publish_mutation(self, mutation):
|
||||
if not self.callback_stub.connected:
|
||||
self.pending_mutations.append(mutation)
|
||||
return
|
||||
|
||||
callback_task = self.event_loop.create_task(
|
||||
self.callback_stub.call('NODEDB_MUTATION', mutation))
|
||||
callback_task.add_done_callback(self.publish_mutation_done)
|
||||
|
||||
def publish_mutation_done(self, callback_task):
|
||||
assert callback_task.done()
|
||||
exc = callback_task.exception()
|
||||
if exc is not None:
|
||||
logger.error(
|
||||
"NODEDB_MUTATION failed with exception: %s", exc)
|
||||
|
||||
def callback_stub_connected(self):
|
||||
assert self.callback_stub.connected
|
||||
while self.pending_mutations:
|
||||
self.publish_mutation(self.pending_mutations.pop(0))
|
||||
|
||||
|
||||
class NodeDBProcessMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.sessions = {}
|
||||
self.db = db.NodeDB()
|
||||
|
||||
async def setup(self):
|
||||
await super().setup()
|
||||
|
||||
self._shutting_down = asyncio.Event()
|
||||
self._shutdown_complete = asyncio.Event()
|
||||
|
||||
self.server.add_command_handler(
|
||||
'START_SESSION', self.handle_start_session)
|
||||
self.server.add_command_handler(
|
||||
'END_SESSION', self.handle_end_session)
|
||||
self.server.add_command_handler('SHUTDOWN', self.handle_shutdown)
|
||||
self.server.add_command_handler(
|
||||
'START_SCAN', self.handle_start_scan)
|
||||
|
||||
self.db.setup()
|
||||
|
||||
async def cleanup(self):
|
||||
self.db.cleanup()
|
||||
await super().cleanup()
|
||||
|
||||
async def run(self):
|
||||
await self._shutting_down.wait()
|
||||
logger.info("Shutting down...")
|
||||
self._shutdown_complete.set()
|
||||
|
||||
def get_session(self, session_id):
|
||||
try:
|
||||
return self.sessions[session_id]
|
||||
except KeyError:
|
||||
raise InvalidSessionError
|
||||
|
||||
def publish_mutation(self, mutation):
|
||||
for session in self.sessions.values():
|
||||
session.publish_mutation(mutation)
|
||||
|
||||
def handle_start_session(self, client_address, flags):
|
||||
client_stub = ipc.Stub(self.event_loop, client_address)
|
||||
connect_task = self.event_loop.create_task(client_stub.connect())
|
||||
session = Session(self.event_loop, client_stub, flags)
|
||||
connect_task.add_done_callback(
|
||||
functools.partial(self._client_connected, session))
|
||||
self.sessions[session.id] = session
|
||||
|
||||
# Send initial mutations to build up the current pipeline
|
||||
# state.
|
||||
for mutation in self.db.initial_mutations():
|
||||
session.publish_mutation(mutation)
|
||||
|
||||
return session.id
|
||||
|
||||
def _client_connected(self, session, connect_task):
|
||||
assert connect_task.done()
|
||||
exc = connect_task.exception()
|
||||
if exc is not None:
|
||||
logger.error("Failed to connect to callback client: %s", exc)
|
||||
return
|
||||
|
||||
session.callback_stub_connected()
|
||||
|
||||
def handle_end_session(self, session_id):
|
||||
session = self.get_session(session_id)
|
||||
session.cleanup()
|
||||
del self.sessions[session_id]
|
||||
|
||||
async def handle_shutdown(self):
|
||||
logger.info("Shutdown received.")
|
||||
self._shutting_down.set()
|
||||
logger.info("Waiting for shutdown to complete...")
|
||||
await self._shutdown_complete.wait()
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
async def handle_start_scan(self, session_id):
|
||||
self.get_session(session_id)
|
||||
return self.db.start_scan()
|
||||
|
||||
|
||||
class NodeDBProcess(NodeDBProcessMixin, core.ProcessImpl):
|
||||
pass
|
Loading…
Reference in new issue