Browse Source

Make the audio thread (mostly) realtime safe.

- Purge all remaining Python from the critical path.
- Add a "realtime checker", and use it to remove most uses of malloc in the audio thread.
- Make logging from the audio thread realtime safe.
- Add a tool to profile the audio thread.
- Set realtime priority on the audio thread.
looper
Ben Niemann 4 years ago
parent
commit
45f67ea7f6
  1. 6
      listdeps
  2. 24
      noisicaa/audioproc/audioproc_client.py
  3. 74
      noisicaa/audioproc/audioproc_process.py
  4. 19
      noisicaa/audioproc/engine/CMakeLists.txt
  5. 4
      noisicaa/audioproc/engine/__init__.py
  6. 1
      noisicaa/audioproc/engine/backend.cpp
  7. 8
      noisicaa/audioproc/engine/backend.h
  8. 7
      noisicaa/audioproc/engine/backend.pxd
  9. 20
      noisicaa/audioproc/engine/backend.pyx
  10. 3
      noisicaa/audioproc/engine/backend_portaudio.cpp
  11. 7
      noisicaa/audioproc/engine/backend_renderer.cpp
  12. 4
      noisicaa/audioproc/engine/block_context.cpp
  13. 7
      noisicaa/audioproc/engine/block_context.h
  14. 6
      noisicaa/audioproc/engine/block_context.pxd
  15. 8
      noisicaa/audioproc/engine/block_context.pyi
  16. 19
      noisicaa/audioproc/engine/block_context.pyx
  17. 259
      noisicaa/audioproc/engine/engine.cpp
  18. 79
      noisicaa/audioproc/engine/engine.h
  19. 42
      noisicaa/audioproc/engine/engine.pxd
  20. 282
      noisicaa/audioproc/engine/engine.pyx
  21. 15
      noisicaa/audioproc/engine/engine_test.py
  22. 23
      noisicaa/audioproc/engine/message_queue.cpp
  23. 145
      noisicaa/audioproc/engine/message_queue.h
  24. 33
      noisicaa/audioproc/engine/message_queue.pxd
  25. 4
      noisicaa/audioproc/engine/message_queue.pyx
  26. 29
      noisicaa/audioproc/engine/opcodes.cpp
  27. 58
      noisicaa/audioproc/engine/player.cpp
  28. 23
      noisicaa/audioproc/engine/player.h
  29. 9
      noisicaa/audioproc/engine/player.pxd
  30. 29
      noisicaa/audioproc/engine/player.pyx
  31. 3
      noisicaa/audioproc/engine/plugin_host.cpp
  32. 2
      noisicaa/audioproc/engine/plugin_host_lv2.cpp
  33. 2
      noisicaa/audioproc/engine/plugin_host_lv2.h
  34. 3
      noisicaa/audioproc/engine/plugin_host_process.py
  35. 6
      noisicaa/audioproc/engine/processor.cpp
  36. 17
      noisicaa/audioproc/engine/processor.pyx
  37. 7
      noisicaa/audioproc/engine/processor_csound_base.cpp
  38. 19
      noisicaa/audioproc/engine/processor_cvgenerator.cpp
  39. 4
      noisicaa/audioproc/engine/processor_cvgenerator_test.py
  40. 17
      noisicaa/audioproc/engine/processor_pianoroll.cpp
  41. 4
      noisicaa/audioproc/engine/processor_pianoroll_test.py
  42. 9
      noisicaa/audioproc/engine/processor_plugin.cpp
  43. 27
      noisicaa/audioproc/engine/processor_sample_script.cpp
  44. 10
      noisicaa/audioproc/engine/processor_sample_script_test.py
  45. 17
      noisicaa/audioproc/engine/processor_sound_file.cpp
  46. 5
      noisicaa/audioproc/engine/processor_sound_file.h
  47. 3
      noisicaa/audioproc/engine/processor_sound_file_test.py
  48. 63
      noisicaa/audioproc/engine/profile.cpp
  49. 40
      noisicaa/audioproc/engine/profile.h
  50. 29
      noisicaa/audioproc/engine/profile.pxd
  51. 15
      noisicaa/audioproc/engine/profile.pyi
  52. 34
      noisicaa/audioproc/engine/profile.pyx
  53. 22
      noisicaa/audioproc/engine/realm.cpp
  54. 2
      noisicaa/audioproc/engine/realm_test.py
  55. 55
      noisicaa/audioproc/engine/realtime.cpp
  56. 37
      noisicaa/audioproc/engine/realtime.h
  57. 44
      noisicaa/audioproc/engine/rtcheck.c
  58. 76
      noisicaa/audioproc/engine/rtcheck.h
  59. 27
      noisicaa/audioproc/engine/rtcheck.pxd
  60. 111
      noisicaa/audioproc/engine/rtcheck_preload.c
  61. 2
      noisicaa/audioproc/engine/spec.h
  62. 14
      noisicaa/audioproc/public/engine_notification.proto
  63. 3
      noisicaa/core/CMakeLists.txt
  64. 1
      noisicaa/core/__init__.py
  65. 81
      noisicaa/core/logging.cpp
  66. 60
      noisicaa/core/logging.h
  67. 13
      noisicaa/core/logging.pxd
  68. 24
      noisicaa/core/logging.pyx
  69. 49
      noisicaa/core/logging_test.pyx
  70. 31
      noisicaa/core/perf_stats.cpp
  71. 4
      noisicaa/core/perf_stats.h
  72. 5
      noisicaa/core/perf_stats.pxd
  73. 28
      noisicaa/core/perf_stats.pyx
  74. 60
      noisicaa/core/perf_stats_test.py
  75. 16
      noisicaa/core/process_manager.py
  76. 4
      noisicaa/core/pump.h
  77. 11
      noisicaa/core/pump.inl.h
  78. 64
      noisicaa/core/status.cpp
  79. 30
      noisicaa/core/status.h
  80. 1
      noisicaa/editor_main.py
  81. 1
      noisicaa/host_system/host_system_lv2.cpp
  82. 1
      noisicaa/host_system/host_system_lv2.h
  83. 3
      noisicaa/lv2/CMakeLists.txt
  84. 4
      noisicaa/lv2/__init__.py
  85. 195
      noisicaa/lv2/atom.pxd
  86. 81
      noisicaa/lv2/atom.pyx
  87. 1
      noisicaa/lv2/urid_mapper.cpp
  88. 2
      noisicaa/lv2/urid_mapper.h
  89. 1
      noisicaa/ui/CMakeLists.txt
  90. 126
      noisicaa/ui/audio_thread_profiler.py
  91. 23
      noisicaa/ui/editor_app.py
  92. 1
      noisicaa/ui/editor_window.py
  93. 46
      noisicaa/ui/load_history.py
  94. 27
      noisicaa/ui/pipeline_perf_monitor.py
  95. 1
      noisicaa/ui/ui_base.py
  96. 23
      noisidev/runtests.py

6
listdeps

@ -102,6 +102,8 @@ SYS_DEPS = {
# encoding
PKG('ffmpeg'),
PKG('libgoogle-perftools4'),
],
'build': [
# qt4
@ -152,6 +154,9 @@ SYS_DEPS = {
# libavutil
PKG('libavutil-dev'),
# profiling
PKG('libgoogle-perftools-dev'),
# other...
PKG('cmake'),
PKG('python3-dev'),
@ -166,6 +171,7 @@ SYS_DEPS = {
PKG('swh-plugins'),
PKG('gdb'),
PKG('xvfb'),
PKG('google-perftools'),
],
'vmtests': [
PKG('virtualbox'),

24
noisicaa/audioproc/audioproc_client.py

@ -25,8 +25,6 @@ import logging
from typing import Any, Optional, Set, Tuple
from noisicaa import core
# pylint/mypy don't understand capnp modules.
from noisicaa.core import perf_stats_capnp # type: ignore # pylint: disable=no-name-in-module
from noisicaa.core import ipc
from noisicaa import node_db
from .public import engine_notification_pb2
@ -46,7 +44,8 @@ class AudioProcClientBase(object):
self.engine_state_changed = None # type: core.Callback[engine_notification_pb2.EngineStateChange]
self.player_state_changed = None # type: core.CallbackMap[str, player_state_pb2.PlayerState]
self.node_state_changed = None # type: core.CallbackMap[str, engine_notification_pb2.NodeStateChange]
self.perf_stats = None # type: core.Callback[object]
self.node_messages = None # type: core.CallbackMap[str, bytes]
self.perf_stats = None # type: core.Callback[core.PerfStats]
@property
def address(self) -> str:
@ -126,7 +125,7 @@ class AudioProcClientBase(object):
async def send_message(self, msg: Any) -> None:
raise NotImplementedError
async def play_file(self, path: str) -> str:
async def play_file(self, path: str) -> None:
raise NotImplementedError
async def dump(self) -> None:
@ -146,7 +145,8 @@ class AudioProcClientMixin(AudioProcClientBase):
self.engine_state_changed = core.Callback[engine_notification_pb2.EngineStateChange]()
self.player_state_changed = core.CallbackMap[str, player_state_pb2.PlayerState]()
self.node_state_changed = core.CallbackMap[str, engine_notification_pb2.NodeStateChange]()
self.perf_stats = core.Callback[object]()
self.node_messages = core.CallbackMap[str, bytes]()
self.perf_stats = core.Callback[core.PerfStats]()
@property
def address(self) -> str:
@ -195,11 +195,16 @@ class AudioProcClientMixin(AudioProcClientBase):
for node_state_change in msg.node_state_changes:
self.node_state_changed.call(node_state_change.node_id, node_state_change)
for node_message in msg.node_messages:
self.node_messages.call(node_message.node_id, node_message.atom)
for engine_state_change in msg.engine_state_changes:
self.engine_state_changed.call(engine_state_change)
if msg.HasField('perf_stats'):
self.perf_stats.call(perf_stats_capnp.PerfStats.from_bytes_packed(msg.perf_stats))
perf_stats = core.PerfStats()
perf_stats.deserialize(msg.perf_stats)
self.perf_stats.call(perf_stats)
async def shutdown(self) -> None:
await self._stub.call('SHUTDOWN')
@ -267,11 +272,14 @@ class AudioProcClientMixin(AudioProcClientBase):
async def send_message(self, msg: Any) -> None:
return await self._stub.call('SEND_MESSAGE', self._session_id, msg.to_bytes())
async def play_file(self, path: str) -> str:
return await self._stub.call('PLAY_FILE', self._session_id, path)
async def play_file(self, path: str) -> None:
await self._stub.call('PLAY_FILE', self._session_id, path)
async def dump(self) -> None:
await self._stub.call('DUMP', self._session_id)
async def profile_audio_thread(self, duration: int) -> bytes:
return await self._stub.call('PROFILE_AUDIO_THREAD', self._session_id, duration)
async def update_project_properties(self, realm: str, **kwargs: Any) -> None:
return await self._stub.call('UPDATE_PROJECT_PROPERTIES', self._session_id, realm, kwargs)

74
noisicaa/audioproc/audioproc_process.py

@ -23,6 +23,7 @@
import asyncio
import functools
import logging
import subprocess
import sys
import time
import uuid
@ -37,6 +38,7 @@ from noisicaa import host_system
from .public import engine_notification_pb2
from .public import player_state_pb2
from .public import processor_message_pb2
from .engine import profile
from . import engine
from . import mutations
@ -109,12 +111,12 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
def __init__(
self, *,
shm: Optional[str] = None, profile_path: Optional[str] = None,
block_size: Optional[int] = None, sample_rate: Optional[int] = None,
shm: Optional[str] = None,
block_size: Optional[int] = None,
sample_rate: Optional[int] = None,
**kwargs: Any) -> None:
super().__init__(**kwargs)
self.shm_name = shm
self.profile_path = profile_path
self.shm = None # type: Optional[posix_ipc.SharedMemory]
self.__urid_mapper = None # type: lv2.ProxyURIDMapper
self.__block_size = block_size
@ -142,6 +144,7 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
self.server.add_command_handler('CREATE_PLUGIN_UI', self.handle_create_plugin_ui)
self.server.add_command_handler('DELETE_PLUGIN_UI', self.handle_delete_plugin_ui)
self.server.add_command_handler('DUMP', self.handle_dump)
self.server.add_command_handler('PROFILE_AUDIO_THREAD', self.handle_profile_audio_thread)
if self.shm_name is not None:
self.shm = posix_ipc.SharedMemory(self.shm_name)
@ -165,8 +168,7 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
manager=self.manager,
server_address=self.server.address,
host_system=self.__host_system,
shm=self.shm,
profile_path=self.profile_path)
shm=self.shm)
self.__engine.notifications.add(
lambda msg: self.event_loop.call_soon_threadsafe(
functools.partial(self.__handle_engine_notification, msg)))
@ -290,9 +292,10 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
self.get_session(session_id)
await self.__engine.set_host_parameters(**parameters)
def handle_set_backend(self, session_id: str, name: str, parameters: Dict[str, Any]) -> None:
async def handle_set_backend(
self, session_id: str, name: str, parameters: Dict[str, Any]) -> None:
self.get_session(session_id)
self.__engine.set_backend(name, **parameters)
await self.__engine.set_backend(name, **parameters)
def handle_set_backend_parameters(self, session_id: str, parameters: Dict[str, Any]) -> None:
self.get_session(session_id)
@ -314,7 +317,7 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
realm = self.__engine.get_realm(realm_name)
realm.update_project_properties(**properties)
async def handle_play_file(self, session_id: str, path: str) -> str:
async def handle_play_file(self, session_id: str, path: str) -> None:
self.get_session(session_id)
realm = self.__engine.get_realm('root')
@ -335,17 +338,22 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
sink.inputs['in:right'].connect(node.outputs['out:right'])
realm.update_spec()
self.__engine.add_notification_listener(
node.id,
functools.partial(self.play_file_done, node_id=node.id))
sound_file_complete_urid = self.__urid_mapper.map(
"http://noisicaa.odahoda.de/lv2/processor_sound_file#complete")
return node.id
complete = asyncio.Event(loop=self.event_loop)
def play_file_done(self, msg_type: str, *, node_id: str) -> None:
realm = self.__engine.get_realm('root')
def handle_notification(notification: engine_notification_pb2.EngineNotification) -> None:
for node_message in notification.node_messages:
if node_message.node_id == node.id:
msg = lv2.wrap_atom(self.__urid_mapper, node_message.atom)
if msg.type_urid == sound_file_complete_urid:
complete.set()
listener = self.__engine.notifications.add(handle_notification)
await complete.wait()
listener.remove()
node = realm.graph.find_node(node_id)
sink = realm.graph.find_node('sink')
sink.inputs['in:left'].disconnect(node.outputs['out:left'])
sink.inputs['in:right'].disconnect(node.outputs['out:right'])
realm.graph.remove_node(node)
@ -363,6 +371,40 @@ class AudioProcProcess(core.SessionHandlerMixin, core.ProcessBase):
def handle_dump(self, session_id: str) -> None:
self.__engine.dump()
async def handle_profile_audio_thread(self, session_id: str, duration: int) -> bytes:
self.get_session(session_id)
path = '/tmp/audio.prof'
logger.warning("Starting profile of the audio thread...")
profile.start(path)
await asyncio.sleep(duration, loop=self.event_loop)
profile.stop()
logger.warning("Audio thread profile complete. Data written to '%s'.", path)
argv = ['/usr/bin/google-pprof', '--dot', sys.executable, path]
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
loop=self.event_loop)
dot, stderr = await proc.communicate()
if proc.returncode != 0:
logger.warning(stderr)
raise RuntimeError(
"Command '%s' failed with return code %d" % (' '.join(argv), proc.returncode))
argv = ['/usr/bin/dot', '-Tsvg']
proc = await asyncio.create_subprocess_exec(
*argv,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
loop=self.event_loop)
svg, stderr = await proc.communicate(dot)
if proc.returncode != 0:
logger.warning(stderr)
raise RuntimeError(
"Command '%s' failed with return code %d" % (' '.join(argv), proc.returncode))
return svg
class AudioProcSubprocess(core.SubprocessMixin, AudioProcProcess):
pass

19
noisicaa/audioproc/engine/CMakeLists.txt

@ -47,6 +47,7 @@ add_python_package(
processor_sample_script_test.py
processor_sound_file_test.py
processor_track_mixer_test.py
profile.pyi
realm.pyi
realm_test.py
spec.pyi
@ -62,6 +63,7 @@ set(LIB_SRCS
buffers.cpp
control_value.cpp
double_buffered_state_manager.cpp
engine.cpp
misc.cpp
message_queue.cpp
opcodes.cpp
@ -86,6 +88,8 @@ set(LIB_SRCS
processor_pianoroll.cpp
processor_cvgenerator.cpp
processor_sample_script.cpp
profile.cpp
realtime.cpp
spec.cpp
realm.cpp
)
@ -113,6 +117,8 @@ target_link_libraries(noisicaa-audioproc-engine PRIVATE noisicaa-audioproc-publi
target_link_libraries(noisicaa-audioproc-engine PRIVATE capnp)
target_link_libraries(noisicaa-audioproc-engine PRIVATE pthread)
target_link_libraries(noisicaa-audioproc-engine PRIVATE rt)
target_link_libraries(noisicaa-audioproc-engine PRIVATE profiler)
target_link_libraries(noisicaa-audioproc-engine PRIVATE rtcheck)
target_include_directories(noisicaa-audioproc-engine PUBLIC ${LIBLILV_INCLUDE_DIRS})
target_link_libraries(noisicaa-audioproc-engine PRIVATE ${LIBLILV_LIBRARIES})
target_include_directories(noisicaa-audioproc-engine PRIVATE ${LIBCSOUND_INCLUDE_DIRS})
@ -166,6 +172,7 @@ target_link_libraries(${plugin_host.so} PRIVATE noisicaa-audioproc-engine)
add_cython_module(plugin_ui_host CXX)
target_link_libraries(${plugin_ui_host.so} PRIVATE noisicaa-audioproc-engine)
target_link_libraries(${plugin_ui_host.so} PRIVATE noisicaa-core)
add_cython_module(realm CXX)
target_link_libraries(${realm.so} PRIVATE noisicaa-audioproc-engine)
@ -173,6 +180,9 @@ target_link_libraries(${realm.so} PRIVATE noisicaa-audioproc-engine)
add_cython_module(player CXX)
target_link_libraries(${player.so} PRIVATE noisicaa-audioproc-engine)
add_cython_module(profile CXX)
target_link_libraries(${profile.so} PRIVATE noisicaa-audioproc-engine)
foreach(TEST_SRC ${TEST_SRCS})
string(REGEX REPLACE "\.pyx$" "" TEST_MOD ${TEST_SRC})
add_cython_module(${TEST_MOD} CXX)
@ -180,3 +190,12 @@ foreach(TEST_SRC ${TEST_SRCS})
target_include_directories(${${TEST_MOD}.so} PRIVATE ${LIBLILV_INCLUDE_DIRS})
target_link_libraries(${${TEST_MOD}.so} PRIVATE ${LIBLILV_LIBRARIES})
endforeach(TEST_SRC)
add_library(rtcheck SHARED rtcheck.c)
add_custom_target(librtcheck_preload ALL DEPENDS librtcheck_preload.so)
add_custom_command(
OUTPUT librtcheck_preload.so
COMMAND gcc -std=c11 -Wall -Werror -pedantic ${CMAKE_CURRENT_LIST_DIR}/rtcheck_preload.c -fPIC -shared -o librtcheck_preload.so -lc -ldl -L${CMAKE_CURRENT_BINARY_DIR} -lrtcheck
DEPENDS ${CMAKE_CURRENT_LIST_DIR}/rtcheck_preload.c rtcheck
)

4
noisicaa/audioproc/engine/__init__.py

@ -18,7 +18,9 @@
#
# @end:license
from .engine import Engine
from .engine import (
PyEngine as Engine
)
from .spec import (
PySpec as Spec
)

1
noisicaa/audioproc/engine/backend.cpp

@ -53,7 +53,6 @@ StatusOr<Backend*> Backend::create(
Status Backend::setup(Realm* realm) {
_realm = realm;
_stopped = false;
return Status::Ok();
}

8
noisicaa/audioproc/engine/backend.h

@ -61,12 +61,6 @@ public:
virtual Status end_block(BlockContext* ctxt) = 0;
virtual Status output(BlockContext* ctxt, const string& channel, BufferPtr samples) = 0;
void stop() { _stopped = true; }
bool stopped() const { return _stopped; }
void release() { _released = true; }
bool released() const { return _released; }
protected:
Backend(HostSystem* host_system, const char* logger_name, const BackendSettings& settings);
@ -74,8 +68,6 @@ protected:
Logger* _logger;
BackendSettings _settings;
Realm* _realm = nullptr;
bool _stopped = false;
bool _released = false;
mutex _msg_queue_mutex;
vector<string> _msg_queue;

7
noisicaa/audioproc/engine/backend.pxd

@ -19,7 +19,6 @@
# @end:license
from libc.stdint cimport uint32_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
@ -47,12 +46,6 @@ cdef extern from "noisicaa/audioproc/engine/backend.h" namespace "noisicaa" nogi
Status end_block(BlockContext* ctxt)
Status output(BlockContext* ctxt, const string& channel, BufferPtr samples)
void stop()
bool stopped() const
void release()
bool released() const
cdef class PyBackendSettings(object):
cdef BackendSettings __settings

20
noisicaa/audioproc/engine/backend.pyx

@ -83,26 +83,6 @@ cdef class PyBackend(object):
with nogil:
backend.cleanup()
def stop(self):
with nogil:
self.__backend.stop()
def stopped(self):
cdef bool c_result
with nogil:
c_result = self.__backend.stopped()
return c_result
def release(self):
with nogil:
self.__backend.release()
def released(self):
cdef bool c_result
with nogil:
c_result = self.__backend.released()
return c_result
def send_message(self, bytes msg):
cdef string c_msg = msg
with nogil:

3
noisicaa/audioproc/engine/backend_portaudio.cpp

@ -24,6 +24,7 @@
#include "noisicaa/host_system/host_system.h"
#include "noisicaa/audioproc/engine/backend_portaudio.h"
#include "noisicaa/audioproc/engine/realm.h"
#include "noisicaa/audioproc/engine/rtcheck.h"
namespace noisicaa {
@ -147,6 +148,8 @@ Status PortAudioBackend::end_block(BlockContext* ctxt) {
ctxt->perf->end_span();
assert(ctxt->perf->current_span_id() == 0);
RTUnsafe rtu; // portaudio does malloc in Pa_WriteStream.
PaError err = Pa_WriteStream(_stream, _samples, _host_system->block_size());
if (err == paOutputUnderflowed) {
_logger->warning("Buffer underrun.");

7
noisicaa/audioproc/engine/backend_renderer.cpp

@ -90,8 +90,10 @@ Status RendererBackend::end_block(BlockContext* ctxt) {
const float* right_in = (float*)_samples[1].get();
float* out = _outbuf.get();
int num_samples = 0;
for (const auto& stime : ctxt->time_map) {
if (stime.start_time >= MusicalTime(0)) {
SampleTime* stime = ctxt->time_map.get();
SampleTime* stime_end = ctxt->time_map.get() + _host_system->block_size();
while (stime < stime_end) {
if (stime->start_time >= MusicalTime(0)) {
*out++ = *left_in;
*out++ = *right_in;
++num_samples;
@ -99,6 +101,7 @@ Status RendererBackend::end_block(BlockContext* ctxt) {
++left_in;
++right_in;
++stime;
}
if (num_samples > 0) {

4
noisicaa/audioproc/engine/block_context.cpp

@ -28,4 +28,8 @@ namespace noisicaa {
BlockContext::~BlockContext() {}
void BlockContext::alloc_time_map(uint32_t block_size) {
time_map.reset(new SampleTime[block_size]);
}
} // namespace noisicaa

7
noisicaa/audioproc/engine/block_context.h

@ -25,11 +25,11 @@
#ifndef _NOISICAA_AUDIOPROC_ENGINE_BLOCK_CONTEXT_H
#define _NOISICAA_AUDIOPROC_ENGINE_BLOCK_CONTEXT_H
#include <stdint.h>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <stdint.h>
#include "noisicaa/core/message.capnp.h"
#include "noisicaa/audioproc/public/musical_time.h"
#include "noisicaa/audioproc/engine/buffers.h"
@ -54,7 +54,8 @@ struct BlockContext {
unique_ptr<PerfStats> perf;
vector<SampleTime> time_map;
unique_ptr<SampleTime> time_map;
void alloc_time_map(uint32_t block_size);
BufferArena* buffer_arena;
@ -67,7 +68,7 @@ struct BlockContext {
// TODO: Use MessageQueue
vector<string> in_messages;
unique_ptr<MessageQueue> out_messages;
MessageQueue* out_messages;
};
} // namespace noisicaa

6
noisicaa/audioproc/engine/block_context.pxd

@ -35,15 +35,17 @@ cdef extern from "noisicaa/audioproc/engine/block_context.h" namespace "noisicaa
cppclass BlockContext:
uint32_t sample_pos
vector[SampleTime] time_map
unique_ptr[SampleTime] time_map
void alloc_time_map(uint32_t block_size)
unique_ptr[PerfStats] perf
unique_ptr[MessageQueue] out_messages
MessageQueue* out_messages
BufferArena* buffer_arena
cdef class PyBlockContext(object):
cdef unique_ptr[BlockContext] __ptr
cdef BlockContext* __ctxt
cdef unique_ptr[MessageQueue] __out_messages
cdef PyPerfStats __perf
@staticmethod

8
noisicaa/audioproc/engine/block_context.pyi

@ -30,10 +30,12 @@ class PyBlockContext(object):
sample_pos = ... # type: int
def __init__(self, buffer_arena: Optional[buffer_arena.PyBufferArena] = None) -> None: ...
def clear_time_map(self) -> None: ...
def append_sample_time(
self, start_time: audioproc.MusicalTime, end_time: audioproc.MusicalTime) -> None: ...
def clear_time_map(self, block_size: int) -> None: ...
def set_sample_time(
self, idx: int, start_time: audioproc.MusicalTime, end_time: audioproc.MusicalTime
) -> None: ...
@property
def perf(self) -> core.PerfStats: ...
def create_out_messages(self) -> None: ...
@property
def out_messages(self) -> Iterator[message_queue.PyMessage]: ...

19
noisicaa/audioproc/engine/block_context.pyx

@ -28,9 +28,11 @@ cdef class PyBlockContext(object):
self.__ptr.reset(new BlockContext())
self.__ctxt = self.__ptr.get()
self.__out_messages.reset(new MessageQueue())
self.__perf = PyPerfStats()
self.__ctxt.perf.reset(self.__perf.release())
self.__ctxt.out_messages.reset(new MessageQueue())
self.__ctxt.out_messages = self.__out_messages.get()
if buffer_arena is not None:
self.__ctxt.buffer_arena = buffer_arena.get()
@ -56,14 +58,17 @@ cdef class PyBlockContext(object):
def sample_pos(self, value):
self.__ctxt.sample_pos = <uint32_t>value
def clear_time_map(self):
self.__ctxt.time_map.clear()
def create_out_messages(self):
# TODO: This leaks the MessageQueue instance (but only in unittests)...
self.__ctxt.out_messages = new MessageQueue()
def clear_time_map(self, int block_size):
self.__ctxt.alloc_time_map(block_size)
def append_sample_time(self, PyMusicalTime start_time, PyMusicalTime end_time):
cdef SampleTime stime
def set_sample_time(self, int idx, PyMusicalTime start_time, PyMusicalTime end_time):
cdef SampleTime* stime = self.__ctxt.time_map.get() + idx
stime.start_time = start_time.get()
stime.end_time = end_time.get()
self.__ctxt.time_map.push_back(stime)
@property
def perf(self):
@ -71,7 +76,7 @@ cdef class PyBlockContext(object):
@property
def out_messages(self):
cdef message_queue.MessageQueue* queue = self.__ctxt.out_messages.get()
cdef message_queue.MessageQueue* queue = self.__ctxt.out_messages
cdef message_queue.Message* msg = queue.first()
while not queue.is_end(msg):
yield message_queue.PyMessage.create(msg)

259
noisicaa/audioproc/engine/engine.cpp

@ -0,0 +1,259 @@
/*
* @begin:license
*
* Copyright (c) 2015-2018, Benjamin Niemann <pink@odahoda.de>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* @end:license
*/
#include <assert.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <chrono>
#include <thread>
#include "noisicaa/core/scope_guard.h"
#include "noisicaa/core/perf_stats.h"
#include "noisicaa/audioproc/public/engine_notification.pb.h"
#include "noisicaa/audioproc/engine/block_context.h"
#include "noisicaa/audioproc/engine/engine.h"
#include "noisicaa/audioproc/engine/message_queue.h"
#include "noisicaa/audioproc/engine/profile.h"
#include "noisicaa/audioproc/engine/realtime.h"
#include "noisicaa/audioproc/engine/rtcheck.h"
namespace noisicaa {
Engine::Engine(HostSystem* host_system, void (*callback)(void*, const string&), void *userdata)
: _host_system(host_system),
_logger(LoggerRegistry::get_logger("noisicaa.audioproc.engine.engine")),
_callback(callback),
_userdata(userdata),
_queue_pump(nullptr),
_next_message_queue(new MessageQueue()),
_current_message_queue(nullptr),
_old_message_queue(new MessageQueue()) {}
Engine::~Engine() {}
Status Engine::setup() {
_stop = false;
_queue_pump.reset(new thread(&Engine::queue_pump_main, this));
return Status::Ok();
}
void Engine::cleanup() {
if (_queue_pump.get() != nullptr) {
_logger->info("Stopping queue pump...");
{
lock_guard<mutex> lock(_cond_mutex);
_stop = true;
_cond.notify_all();
}
_queue_pump->join();
_queue_pump.reset();
_logger->info("Queue pump stopped.");
}
MessageQueue* message_queue = _next_message_queue.exchange(nullptr);
if (message_queue != nullptr) {
delete message_queue;
}
message_queue = _current_message_queue.exchange(nullptr);
if (message_queue != nullptr) {
delete message_queue;
}
message_queue = _old_message_queue.exchange(nullptr);
if (message_queue != nullptr) {
delete message_queue;
}
}
void Engine::queue_pump_main() {
unique_lock<mutex> lock(_cond_mutex);
while (true) {
_cond.wait_for(lock, chrono::milliseconds(500));
MessageQueue* queue = _old_message_queue.exchange(nullptr);
if (queue != nullptr) {
if (!queue->empty()) {
pb::EngineNotification notification;
Message* msg = queue->first();
while (!queue->is_end(msg)) {
switch (msg->type) {
case MessageType::ENGINE_LOAD: {
EngineLoadMessage* tmsg = (EngineLoadMessage*)msg;
auto n = notification.add_engine_state_changes();
n->set_state(pb::EngineStateChange::RUNNING);
n->set_load(tmsg->load);
break;
}
case MessageType::PERF_STATS: {
PerfStatsMessage* tmsg = (PerfStatsMessage*)msg;
notification.set_perf_stats(tmsg->perf_stats(), tmsg->length);
break;
}
case MessageType::PLAYER_STATE: {
PlayerStateMessage* tmsg = (PlayerStateMessage*)msg;
auto n = notification.mutable_player_state();
n->set_realm(tmsg->realm);
n->set_playing(tmsg->playing);
tmsg->current_time.set_proto(n->mutable_current_time());
n->set_loop_enabled(tmsg->loop_enabled);
tmsg->loop_start_time.set_proto(n->mutable_loop_start_time());
tmsg->loop_end_time.set_proto(n->mutable_loop_end_time());
break;
}
case MessageType::NODE_MESSAGE: {
NodeMessage* tmsg = (NodeMessage*)msg;
auto n = notification.add_node_messages();
n->set_node_id(tmsg->node_id);
n->set_atom(tmsg->atom(), tmsg->atom_size());
break;
}
default: {
_logger->error("Unexpected message type %d", msg->type);
break;
}
}
msg = queue->next(msg);
}
queue->clear();
string notification_serialized;
assert(notification.SerializeToString(&notification_serialized));
_callback(_userdata, notification_serialized);
}
queue = _next_message_queue.exchange(queue);
if (queue != nullptr) {
assert(_old_message_queue.exchange(queue) == nullptr);
}
}
if (_stop) {
break;
}
}
}
Status Engine::setup_thread() {
_exit_loop = false;
RETURN_IF_ERROR(set_thread_to_rt_priority(_logger));
return Status::Ok();
}
void Engine::exit_loop() {
_exit_loop = true;
}
Status Engine::loop(Realm* realm, Backend* backend) {
assert(realm != nullptr);
assert(backend != nullptr);
enable_profiling_in_thread();
_logger->info("Audio thread: PID=%d TID=%ld", getpid(), syscall(__NR_gettid));
RTSafe rts; // Enable rtchecker in audio thread.
chrono::high_resolution_clock::time_point last_loop_time =
chrono::high_resolution_clock::time_point::min();
while (!_exit_loop) {
BlockContext* ctxt = realm->block_context();
StatusOr<Program*> stor_program = realm->get_active_program();
RETURN_IF_ERROR(stor_program);
Program* program = stor_program.result();
if (program == nullptr) {
this_thread::sleep_for(chrono::milliseconds(100));
continue;
}
MessageQueue* current_queue = _next_message_queue.exchange(nullptr);
if (current_queue != nullptr) {
assert(current_queue->empty());
MessageQueue* old = _current_message_queue.exchange(nullptr);
if (old != nullptr) {
assert(_old_message_queue.exchange(old) == nullptr);
_cond.notify_all();
}
} else {
current_queue = _current_message_queue.exchange(nullptr);
assert(current_queue != nullptr);
}
ctxt->out_messages = current_queue;
if (ctxt->perf->num_spans() > 0) {
PerfStatsMessage::push(ctxt->out_messages, *ctxt->perf);
}
ctxt->perf->reset();
RETURN_IF_ERROR(backend->begin_block(ctxt));
auto auto_end_block = scopeGuard([this, backend, ctxt]() {
Status status = backend->end_block(ctxt);
if (status.is_error()) {
_logger->error(
"Backend::end_block() failed: %s:%d %s",
status.file(), status.line(), status.message());
}
});
RETURN_IF_ERROR(realm->process_block(program));
Buffer* buf = realm->get_buffer("sink:in:left");
if(buf != nullptr) {
RETURN_IF_ERROR(backend->output(ctxt, "left", buf->data()));
}
buf = realm->get_buffer("sink:in:right");
if(buf != nullptr) {
RETURN_IF_ERROR(backend->output(ctxt, "right", buf->data()));
}
if (last_loop_time > chrono::high_resolution_clock::time_point::min()) {
auto loop_duration = chrono::high_resolution_clock::now() - last_loop_time;
double loop_usec = std::chrono::duration_cast<std::chrono::microseconds>(loop_duration).count();
double block_usec = 1e6 * _host_system->block_size() / _host_system->sample_rate();
double load = loop_usec / block_usec;
EngineLoadMessage::push(ctxt->out_messages, load);
}
auto_end_block.dismiss();
RETURN_IF_ERROR(backend->end_block(ctxt));
last_loop_time = chrono::high_resolution_clock::now();
ctxt->out_messages = nullptr;
assert(_current_message_queue.exchange(current_queue) == nullptr);
}
return Status::Ok();
}
} // namespace noisicaa

79
noisicaa/audioproc/engine/engine.h

@ -0,0 +1,79 @@
// -*- mode: c++ -*-
/*
* @begin:license
*
* Copyright (c) 2015-2018, Benjamin Niemann <pink@odahoda.de>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* @end:license
*/
#ifndef _NOISICAA_AUDIOPROC_ENGINE_ENGINE_H
#define _NOISICAA_AUDIOPROC_ENGINE_ENGINE_H
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "noisicaa/core/logging.h"
#include "noisicaa/core/status.h"
#include "noisicaa/host_system/host_system.h"
#include "noisicaa/audioproc/engine/backend.h"
#include "noisicaa/audioproc/engine/message_queue.h"
#include "noisicaa/audioproc/engine/realm.h"
namespace noisicaa {
using namespace std;
class Engine {
public:
Engine(HostSystem* host_system, void (*callback)(void*, const string&), void* userdata);
virtual ~Engine();
Status setup();
void cleanup();
Status setup_thread();
void exit_loop();
Status loop(Realm* realm, Backend* backend);
private:
HostSystem* _host_system;
Logger* _logger;
void (*_callback)(void*, const string&);
void *_userdata;
bool _exit_loop;
unique_ptr<thread> _queue_pump;
bool _stop = false;
mutex _cond_mutex;
condition_variable _cond;
void queue_pump_main();
atomic<MessageQueue*> _next_message_queue;
atomic<MessageQueue*> _current_message_queue;
atomic<MessageQueue*> _old_message_queue;
};
} // namespace noisicaa
#endif

42
noisicaa/audioproc/engine/engine.pxd

@ -0,0 +1,42 @@
# @begin:license
#
# Copyright (c) 2015-2018, Benjamin Niemann <pink@odahoda.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# @end:license
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from noisicaa.core.status cimport Status
from noisicaa.host_system cimport host_system as host_system_lib
from noisicaa.audioproc.engine cimport realm as realm_lib
from noisicaa.audioproc.engine cimport backend as backend_lib
cdef extern from "noisicaa/audioproc/engine/engine.h" namespace "noisicaa" nogil:
cppclass Engine:
Engine(
host_system_lib.HostSystem* host_system,
void (*callback)(void*, const string&),
void* userdata)
Status setup()
void cleanup()
Status setup_thread();
void exit_loop()
Status loop(realm_lib.Realm* realm, backend_lib.Backend* backend) nogil

282
noisicaa/audioproc/engine/engine.pyx

@ -20,6 +20,10 @@
#
# @end:license
from libcpp.string cimport string
from cpython.ref cimport PyObject
from cpython.exc cimport PyErr_Fetch, PyErr_Restore
import asyncio
import enum
import functools
@ -30,7 +34,6 @@ import random
import sys
import threading
import time
import cProfile
import toposort
@ -39,10 +42,11 @@ from noisicaa.bindings import lv2
from noisicaa.core import ipc
from noisicaa.core.status cimport check
from noisicaa.core.perf_stats cimport PyPerfStats
from noisicaa.host_system.host_system cimport PyHostSystem
from noisicaa.audioproc.public import engine_notification_pb2
from noisicaa.audioproc.public import musical_time
from noisicaa.audioproc.public import player_state_pb2
from . cimport realm as realm_lib
from .realm cimport PyRealm
from .spec cimport PySpec
from .block_context cimport PyBlockContext
from .backend cimport PyBackend, PyBackendSettings
@ -67,19 +71,24 @@ class DuplicateRootRealm(Error):
pass
class Engine(object):
cdef class PyEngine(object):
cdef dict __dict__
cdef unique_ptr[Engine] __engine_ptr
cdef Engine* __engine
cdef PyHostSystem __host_system
cdef PyRealm __root_realm
def __init__(
self, *,
host_system, event_loop, manager, server_address,
shm=None, profile_path=None):
PyHostSystem host_system, event_loop, manager, server_address, shm=None):
self.notifications = core.Callback()
self.__engine = NULL
self.__host_system = host_system
self.__event_loop = event_loop
self.__manager = manager
self.__server_address = server_address
self.__shm = shm
self.__profile_path = profile_path
self.__realms = {}
self.__root_realm = None
@ -91,7 +100,6 @@ class Engine(object):
self.__engine_thread = None
self.__engine_started = None
self.__engine_exit = None
self.__maintenance_task = None
self.__plugin_host = None
@ -99,8 +107,6 @@ class Engine(object):
self.__bpm = 120
self.__duration = musical_time.PyMusicalDuration(2, 1)
self.__notification_listeners = core.CallbackMap()
def dump(self):
# TODO: reimplement
pass
@ -110,41 +116,19 @@ class Engine(object):
engine_state_changes=[engine_notification_pb2.EngineStateChange(
state=state)]))
async def setup(self, *, start_thread=True):
async def setup(self, *):
self.__set_state(engine_notification_pb2.EngineStateChange.SETUP)
self.__engine_started = threading.Event()
self.__engine_exit = threading.Event()
if start_thread:
self.__engine_thread = threading.Thread(target=self.engine_main)
self.__engine_thread.start()
logger.info("Starting engine thread (%s)...", self.__engine_thread.ident)
self.__engine_started.wait()
logger.info("Starting maintenance task...")
self.__maintenance_task = self.__event_loop.create_task(self.__maintenance_task_main())
logger.info("Engine up and running.")
self.__set_state(engine_notification_pb2.EngineStateChange.RUNNING)
self.__engine_ptr.reset(new Engine(
self.__host_system.get(), self.__notification_callback, <PyObject*>self))
self.__engine = self.__engine_ptr.get()
with nogil:
check(self.__engine.setup())
async def cleanup(self):
self.__set_state(engine_notification_pb2.EngineStateChange.CLEANUP)
if self.__backend is not None:
logger.info("Stopping backend...")
self.__backend.stop()
if self.__maintenance_task is not None:
logger.info("Shutting down maintenance task...")
self.__maintenance_task.cancel()
self.__maintenance_task = None
if self.__engine_thread is not None: # pragma: no branch
logger.info("Shutting down engine thread...")
self.__engine_exit.set()
self.__engine_thread.join()
self.__engine_thread = None
logger.info("Engine thread stopped.")
await self.stop_engine()
if self.__backend is not None:
self.__backend.cleanup()
@ -160,9 +144,6 @@ class Engine(object):
self.__plugin_host = None
logger.info("Plugin host process stopped.")
self.__engine_started = None
self.__engine_exit = None
for realm in self.__realms.values():
await realm.cleanup()
self.__realms.clear()
@ -172,10 +153,42 @@ class Engine(object):
listener.remove()
self.__realm_listeners.clear()
if self.__engine != NULL:
with nogil:
self.__engine.cleanup()
self.__engine_ptr.release()
self.__engine = NULL
self.__set_state(engine_notification_pb2.EngineStateChange.STOPPED)
def add_notification_listener(self, node_id, func):
return self.__notification_listeners.add(node_id, func)
async def start_engine(self):
assert self.__root_realm is not None
assert self.__backend is not None
self.__engine_started = threading.Event()
self.__engine_thread = threading.Thread(target=self.engine_main)
self.__engine_thread.start()
logger.info("Starting engine thread (%s)...", self.__engine_thread.ident)
self.__engine_started.wait()
logger.info("Starting maintenance task...")
self.__maintenance_task = self.__event_loop.create_task(self.__maintenance_task_main())
async def stop_engine(self):
if self.__maintenance_task is not None:
logger.info("Shutting down maintenance task...")
self.__maintenance_task.cancel()
self.__maintenance_task = None
if self.__engine_thread is not None: # pragma: no branch
logger.info("Shutting down engine thread...")
self.__engine.exit_loop()
self.__engine_thread.join()
self.__engine_thread = None
logger.info("Engine thread stopped.")
self.__engine_started = None
async def get_plugin_host(self):
if self.__plugin_host is None:
@ -191,7 +204,7 @@ class Engine(object):
async def delete_plugin_ui(self, realm, node_id):
return await self.__plugin_host.call('DELETE_UI', realm, node_id)
def get_realm(self, name: str) -> realm_lib.PyRealm:
def get_realm(self, name: str) -> PyRealm:
try:
return self.__realms[name]
except KeyError as exc:
@ -200,7 +213,7 @@ class Engine(object):
async def create_realm(
self, *,
name: str, parent: str, enable_player: bool = False, callback_address: str = None
) -> realm_lib.PyRealm:
) -> PyRealm:
if name in self.__realms:
raise DuplicateRealmName("Realm '%s' already exists" % name)
@ -216,16 +229,12 @@ class Engine(object):
if enable_player:
logger.info("Enabling player...")
player = PyPlayer(self.__host_system, name)
player.player_state_changed.add(
lambda state: self.notifications.call(
engine_notification_pb2.EngineNotification(
player_state=state)))
else:
logger.info("Player disabled.")
player = None
realm = realm_lib.PyRealm(