Allow profiling of the VM mainloop.

looper
Ben Niemann 5 years ago
parent 8b7b09c85c
commit 0aed9fd747
  1. 23
      bin/runtests.py
  2. 11
      noisicaa/audioproc/audioproc_process.py
  3. 112
      noisicaa/audioproc/vm/engine.py
  4. 1
      noisicaa/constants.py
  5. 7
      noisicaa/music/player_integration_test.py

@ -9,9 +9,7 @@ import sys
import unittest
import coverage
import pyximport
pyximport.install(setup_args={'script_args': ['--verbose']})
LIBDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, LIBDIR)
@ -31,9 +29,11 @@ def main(argv):
help="Minimum level for log messages written to STDERR.")
parser.add_argument('--nocoverage', action='store_true', default=False)
parser.add_argument('--write_perf_stats', action='store_true', default=False)
parser.add_argument('--profile', action='store_true', default=False)
args = parser.parse_args(argv[1:])
constants.TEST_OPTS.WRITE_PERF_STATS = args.write_perf_stats
constants.TEST_OPTS.ENABLE_PROFILER = args.profile
logging.basicConfig()
logging.getLogger().setLevel({
@ -44,6 +44,25 @@ def main(argv):
'critical': logging.CRITICAL,
}[args.log_level if not args.debug else 'debug'])
build_dir = 'pyxbuild'
directives = {}
if args.profile:
directives['profile'] = True
build_dir += '-profile'
pyximport.install(
build_dir=os.path.join(os.getenv('VIRTUAL_ENV'), build_dir),
setup_args={
'script_args': ['--verbose'],
'options': {
'build_ext': {
'cython_directives': directives,
}
}
}
)
loader = unittest.defaultTestLoader
suite = unittest.TestSuite()

@ -79,9 +79,10 @@ class Session(object):
class AudioProcProcessMixin(object):
def __init__(self, *args, shm=None, **kwargs):
def __init__(self, *args, shm=None, profile_path=None, **kwargs):
super().__init__(*args, **kwargs)
self.shm_name = shm
self.profile_path = profile_path
self.shm = None
self.__vm = None
@ -127,7 +128,9 @@ class AudioProcProcessMixin(object):
if self.shm_name is not None:
self.shm = posix_ipc.SharedMemory(self.shm_name)
self.__vm = vm.PipelineVM(shm=self.shm)
self.__vm = vm.PipelineVM(
shm=self.shm,
profile_path=self.profile_path)
self.__vm.listeners.add('perf_data', self.perf_data_callback)
self.__vm.listeners.add('node_state', self.node_state_callback)
@ -157,7 +160,9 @@ class AudioProcProcessMixin(object):
async def run(self):
await self.__shutting_down.wait()
logger.info("Shutting down...")
self.__vm.cleanup()
if self.__vm is not None:
self.__vm.cleanup()
self.__vm = None
logger.info("Pipeline finished.")
self.__shutdown_complete.set()

@ -8,6 +8,7 @@ import random
import sys
import threading
import time
import cProfile
from noisicaa import core
from noisicaa import rwlock
@ -75,12 +76,13 @@ class Buffer(object):
class PipelineVM(object):
def __init__(
self, *,
sample_rate=44100, frame_size=128, shm=None):
sample_rate=44100, frame_size=128, shm=None, profile_path=None):
self.listeners = core.CallbackRegistry()
self.__sample_rate = sample_rate
self.__frame_size = frame_size
self.__shm = shm
self.__profile_path = profile_path
self.__vm_thread = None
self.__vm_started = None
@ -240,72 +242,84 @@ class PipelineVM(object):
self.__notifications.append((node_id, notification))
def vm_main(self):
profiler = None
try:
logger.info("Starting VM...")
ctxt = audioproc.FrameContext()
ctxt.perf = core.PerfStats()
ctxt.sample_pos = 0
ctxt.duration = -1
self.__vm_started.set()
while True:
if self.__vm_exit.is_set():
logger.info("Exiting VM mainloop.")
break
if self.__profile_path:
profiler = cProfile.Profile()
profiler.enable()
try:
self.vm_loop()
finally:
if profiler is not None:
profiler.disable()
profiler.dump_stats(self.__profile_path)
backend = self.__backend
if backend is None:
time.sleep(0.1)
continue
except: # pragma: no coverage # pylint: disable=bare-except
sys.stdout.flush()
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
time.sleep(0.2)
os._exit(1) # pylint: disable=protected-access
finally:
logger.info("VM finished.")
self.listeners.call('perf_data', ctxt.perf.serialize())
def vm_loop(self):
ctxt = audioproc.FrameContext()
ctxt.perf = core.PerfStats()
ctxt.sample_pos = 0
ctxt.duration = -1
ctxt.perf = core.PerfStats()
while True:
if self.__vm_exit.is_set():
logger.info("Exiting VM mainloop.")
break
backend.begin_frame(ctxt)
backend = self.__backend
if backend is None:
time.sleep(0.1)
continue
try:
if backend.stopped:
break
self.listeners.call('perf_data', ctxt.perf.serialize())
if ctxt.duration != self.__frame_size:
logger.info("frame_size=%d", ctxt.duration)
with self.writer_lock():
self.__frame_size = ctxt.duration
self.update_spec()
ctxt.perf = core.PerfStats()
with self.reader_lock():
if self.__spec is not None:
if not self.__spec_initialized:
self.run_vm(self.__spec, ctxt, RunAt.INIT)
self.__spec_initialized = True
self.run_vm(self.__spec, ctxt, RunAt.PERFORMANCE)
else:
time.sleep(0.05)
backend.begin_frame(ctxt)
finally:
notifications = self.__notifications
self.__notifications = []
try:
if backend.stopped:
break
with ctxt.perf.track('send_notifications'):
for node_id, notification in notifications:
self.notification_listener.call(node_id, notification)
if ctxt.duration != self.__frame_size:
logger.info("frame_size=%d", ctxt.duration)
with self.writer_lock():
self.__frame_size = ctxt.duration
self.update_spec()
backend.end_frame()
with self.reader_lock():
if self.__spec is not None:
if not self.__spec_initialized:
self.run_vm(self.__spec, ctxt, RunAt.INIT)
self.__spec_initialized = True
self.run_vm(self.__spec, ctxt, RunAt.PERFORMANCE)
else:
time.sleep(0.05)
ctxt.sample_pos += ctxt.duration
finally:
notifications = self.__notifications
self.__notifications = []
except: # pragma: no coverage # pylint: disable=bare-except
sys.stdout.flush()
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
time.sleep(0.2)
os._exit(1) # pylint: disable=protected-access
with ctxt.perf.track('send_notifications'):
for node_id, notification in notifications:
self.notification_listener.call(node_id, notification)
finally:
logger.info("VM finished.")
backend.end_frame()
ctxt.sample_pos += ctxt.duration
def run_vm(self, s, ctxt, run_at):
for opcode, state in zip(s.opcodes, self.__opcode_states):

@ -36,6 +36,7 @@ else:
TESTLOG_DIR = os.path.abspath(os.path.join(__file__, '..', '..', 'testlogs'))
class TEST_OPTS(object):
WRITE_PERF_STATS = False
ENABLE_PROFILER = False
# Cleanup namespace
del os

@ -159,7 +159,12 @@ class PlayerTest(asynctest.TestCase):
self.audioproc_server_main.server.address, flags={'perf_data'})
await self.audioproc_client_main.set_backend('pyaudio', frame_size=1024)
self.audioproc_server_player = TestAudioProcProcess(self.loop, 'player_process')
profile_path = None
if constants.TEST_OPTS.ENABLE_PROFILER:
profile_path = os.path.join(tempfile.gettempdir(), self.id() + '.prof')
self.audioproc_server_player = TestAudioProcProcess(
self.loop, 'player_process',
profile_path=profile_path)
await self.audioproc_server_player.setup()
self.audioproc_server_player_task = self.loop.create_task(
self.audioproc_server_player.run())

Loading…
Cancel
Save