Collect frame stats in player integration test.

looper
Ben Niemann 5 years ago
parent c3cb7fe611
commit 8b7b09c85c
  1. 11
      NOTES.org
  2. 5
      bin/runtests.py
  3. 5
      noisicaa/audioproc/audioproc_process.py
  4. 36
      noisicaa/audioproc/backend.py
  5. 25
      noisicaa/audioproc/nodes/ipc.py
  6. 3
      noisicaa/audioproc/vm/engine.py
  7. 7
      noisicaa/constants.py
  8. 14
      noisicaa/core/ipc.py
  9. 81
      noisicaa/music/player_integration_test.py
  10. 1
      requirements.txt
  11. 2
      testlogs/player_integration_test.csv

@ -1,13 +1,10 @@
# -*- org-tags-column: -98 -*-
* VM-based pipeline engine :FR:
- performance test
- collect per-frame timings in player_integration_test
- total e2e time per frame
- min, max, avg, stddev
- append to csv file
- datetime, cpuinfo, timing data
- cpuinfo: py-cpuinfo module
- player_integration_test with null backend
vm thread seems to saturate CPU, doesn't let main thread handle pipeline_status messages.
When turning pipeline down, queued messages cause lots of errors.
- make sure to flush messages out before shutting down
- use protos for PipelineMutations instead of pickled objects.
- better test coverage
- base class for node unittests

@ -16,6 +16,8 @@ pyximport.install(setup_args={'script_args': ['--verbose']})
LIBDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, LIBDIR)
from noisicaa import constants
os.environ['LD_LIBRARY_PATH'] = os.path.join(os.getenv('VIRTUAL_ENV'), 'lib')
def main(argv):
@ -28,8 +30,11 @@ def main(argv):
default='error',
help="Minimum level for log messages written to STDERR.")
parser.add_argument('--nocoverage', action='store_true', default=False)
parser.add_argument('--write_perf_stats', action='store_true', default=False)
args = parser.parse_args(argv[1:])
constants.TEST_OPTS.WRITE_PERF_STATS = args.write_perf_stats
logging.basicConfig()
logging.getLogger().setLevel({
'debug': logging.DEBUG,

@ -5,6 +5,7 @@ import functools
import logging
import sys
import uuid
import io
import posix_ipc
@ -67,7 +68,9 @@ class Session(object):
assert callback_task.done()
exc = callback_task.exception()
if exc is not None:
logger.error("PUBLISH_STATUS failed with exception: %s", exc)
buf = io.StringIO()
callback_task.print_stack(file=buf)
logger.error("PUBLISH_STATUS failed with exception: %s\n%s", exc, buf.getvalue())
def callback_stub_connected(self):
assert self.callback_stub.connected

@ -27,6 +27,12 @@ class Backend(object):
def __init__(self):
self.__stopped = threading.Event()
self.__sample_rate = None
self.__ctxt = None
@property
def ctxt(self):
assert self.__ctxt is not None
return self.__ctxt
def setup(self, sample_rate):
self.__sample_rate = sample_rate
@ -49,10 +55,10 @@ class Backend(object):
self.__stopped.set()
def begin_frame(self, ctxt):
raise NotImplementedError
self.__ctxt = ctxt
def end_frame(self):
raise NotImplementedError
self.__ctxt = None
def output(self, channel, samples):
raise NotImplementedError
@ -72,10 +78,13 @@ class NullBackend(Backend):
self.__frame_size = frame_size
def begin_frame(self, ctxt):
super().begin_frame(ctxt)
ctxt.perf.start_span('frame')
ctxt.duration = self.__frame_size
def end_frame(self):
pass
self.ctxt.perf.end_span()
super().end_frame()
def output(self, channel, samples):
pass
@ -163,13 +172,17 @@ class PyAudioBackend(Backend):
self.__need_more.set()
def begin_frame(self, ctxt):
super().begin_frame(ctxt)
if self.stopped:
return
self.__need_more.wait()
ctxt.perf.start_span('frame')
self.__outputs.clear()
ctxt.duration = self.__frame_size
def end_frame(self):
self.ctxt.perf.end_span()
# TODO: feed non-interleaved sample buffers directly into
# resample
interleaved = bytearray(8 * self.__frame_size)
@ -195,6 +208,8 @@ class PyAudioBackend(Backend):
if len(self.__buffer) >= self.__buffer_threshold:
self.__need_more.clear()
super().end_frame()
def output(self, channel, samples):
self.__outputs[channel] = samples
@ -210,7 +225,6 @@ class IPCBackend(Backend):
socket_dir, 'audiostream.%s.pipe' % uuid.uuid4().hex)
self.__stream = audio_stream.AudioStreamServer(self.address)
self.__ctxt = None
self.__out_frame = None
self.__entities = None
@ -229,9 +243,12 @@ class IPCBackend(Backend):
super().stop()
def begin_frame(self, ctxt):
self.__ctxt = ctxt
super().begin_frame(ctxt)
try:
in_frame = self.__stream.receive_frame()
ctxt.perf.start_span('frame')
ctxt.duration = in_frame.frameSize
ctxt.entities = {
entity.id: entity
@ -250,19 +267,22 @@ class IPCBackend(Backend):
def end_frame(self):
if self.__out_frame is not None:
self.ctxt.perf.end_span()
self.__out_frame.init('entities', len(self.__entities))
for idx, entity in enumerate(self.__entities):
self.__out_frame.entities[idx] = entity
assert self.__ctxt.perf.current_span_id == 0
self.__out_frame.perfData = self.__ctxt.perf.serialize()
assert self.ctxt.perf.current_span_id == 0
self.__out_frame.perfData = self.ctxt.perf.serialize()
self.__stream.send_frame(self.__out_frame)
self.__ctxt = None
self.__out_frame = None
self.__entities = None
super().end_frame()
def output(self, channel, samples):
assert self.__out_frame is not None

@ -60,17 +60,20 @@ class IPCNode(node.CustomNode):
raise ValueError(port_name)
def run(self, ctxt):
request = frame_data_capnp.FrameData.new_message()
request.samplePos = ctxt.sample_pos
request.frameSize = ctxt.duration
self.__stream.send_frame(request)
response = self.__stream.receive_frame()
assert response.samplePos == ctxt.sample_pos, (
response.samplePos, ctxt.sample_pos)
assert response.frameSize == ctxt.duration, (
response.frameSize, ctxt.duration)
ctxt.perf.add_spans(response.perfData)
with ctxt.perf.track('ipc'):
request = frame_data_capnp.FrameData.new_message()
request.samplePos = ctxt.sample_pos
request.frameSize = ctxt.duration
with ctxt.perf.track('ipc.send_frame'):
self.__stream.send_frame(request)
with ctxt.perf.track('ipc.receive_frame'):
response = self.__stream.receive_frame()
assert response.samplePos == ctxt.sample_pos, (
response.samplePos, ctxt.sample_pos)
assert response.frameSize == ctxt.duration, (
response.frameSize, ctxt.duration)
ctxt.perf.add_spans(response.perfData)
for entity in response.entities:
if entity.id == 'output:left':

@ -264,8 +264,7 @@ class PipelineVM(object):
ctxt.perf = core.PerfStats()
with ctxt.perf.track('backend_begin_frame'):
backend.begin_frame(ctxt)
backend.begin_frame(ctxt)
try:
if backend.stopped:

@ -1,5 +1,7 @@
#!/usr/bin/python3
# Important: This module must not import any other noisicaa modules.
import os
import os.path
import subprocess
@ -30,6 +32,11 @@ for d in ['noisicaä', 'Noisicaä', 'noisicaa', 'Noisicaa']:
else:
PROJECT_DIR = MUSIC_DIR
# Test related stuff.
TESTLOG_DIR = os.path.abspath(os.path.join(__file__, '..', '..', 'testlogs'))
class TEST_OPTS(object):
WRITE_PERF_STATS = False
# Cleanup namespace
del os
del subprocess

@ -320,6 +320,13 @@ class Stub(object):
self._command_loop_task = self._event_loop.create_task(self.command_loop())
async def close(self):
assert self._transport is not None
self._transport.close()
await self._protocol.closed_event.wait()
self._transport = None
self._protocol = None
if self._command_loop_task is not None:
self._command_loop_cancelled.set()
await asyncio.wait_for(self._command_loop_task, None)
@ -328,13 +335,6 @@ class Stub(object):
if self._command_queue is not None:
self._command_queue = None
assert self._transport is not None
self._transport.close()
await self._protocol.closed_event.wait()
self._transport = None
self._protocol = None
async def __aenter__(self):
await self.connect()
return self

@ -1,17 +1,24 @@
#!/usr/bin/python3
import asyncio
import contextlib
import csv
import datetime
import logging
import os.path
import pprint
import tempfile
import threading
import time
import uuid
import unittest
from unittest import mock
import uuid
import asynctest
import numpy
import cpuinfo
from noisicaa import constants
from noisicaa import core
from noisicaa import audioproc
from noisicaa.audioproc import audioproc_process
@ -29,6 +36,38 @@ from . import project_client
logger = logging.getLogger(__name__)
def write_frame_stats(testname, frame_times):
frame_times = numpy.array(frame_times, dtype=numpy.int64)
data = []
data.append(
('datetime', datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')))
ci = cpuinfo.get_cpu_info()
data.append(('CPU brand', ci['brand']))
data.append(('CPU speed', ci['hz_advertised']))
data.append(('CPU cores', ci['count']))
data.append(('testname', testname))
data.append(('#frames', len(frame_times)))
data.append(('mean', frame_times.mean()))
data.append(('stddev', frame_times.std()))
for p in (50, 75, 90, 95, 99, 99.9):
data.append(
('%.1fth %%tile' % p, numpy.percentile(frame_times, p)))
logger.info("Frame stats:\n%s", pprint.pformat(data))
if constants.TEST_OPTS.WRITE_PERF_STATS:
with open(
os.path.join(constants.TESTLOG_DIR, 'player_integration_test.csv'),
'a', newline='', encoding='utf-8') as fp:
writer = csv.writer(fp, dialect=csv.unix_dialect)
if fp.tell() == 0:
writer.writerow([h for h, _ in data])
writer.writerow([v for _, v in data])
class TestAudioProcProcessImpl(object):
def __init__(self, event_loop, name):
super().__init__()
@ -117,7 +156,7 @@ class PlayerTest(asynctest.TestCase):
self.audioproc_client_main = TestAudioProcClient(self.loop, 'main_client')
await self.audioproc_client_main.setup()
await self.audioproc_client_main.connect(
self.audioproc_server_main.server.address)
self.audioproc_server_main.server.address, flags={'perf_data'})
await self.audioproc_client_main.set_backend('pyaudio', frame_size=1024)
self.audioproc_server_player = TestAudioProcProcess(self.loop, 'player_process')
@ -150,6 +189,27 @@ class PlayerTest(asynctest.TestCase):
await self.node_db.cleanup()
@contextlib.contextmanager
def track_frame_stats(self, testname):
frame_times = []
def cb(status):
perf_data = status.get('perf_data', None)
if perf_data:
topspan = perf_data.spans[0]
assert topspan.parentId == 0
assert topspan.name == 'frame'
duration = (topspan.endTimeNSec - topspan.startTimeNSec) / 1000.0
frame_times.append(duration)
listener = self.audioproc_client_main.listeners.add('pipeline_status', cb)
try:
yield
write_frame_stats(testname, frame_times)
finally:
listener.remove()
async def test_playback_demo(self):
p = player.Player(self.sheet, self.callback_server.address, self.mock_manager, self.loop)
try:
@ -170,17 +230,18 @@ class PlayerTest(asynctest.TestCase):
await self.callback_server.wait_for('pipeline_state'),
'running')
await p.update_settings(project_client.PlayerSettings(state='playing'))
with self.track_frame_stats('playback_demo'):
await p.update_settings(project_client.PlayerSettings(state='playing'))
self.assertEqual(
await self.callback_server.wait_for('player_state'),
'playing')
self.assertEqual(
await self.callback_server.wait_for('player_state'),
'playing')
logger.info("Waiting for end")
logger.info("Waiting for end")
self.assertEqual(
await self.callback_server.wait_for('player_state'),
'stopped')
self.assertEqual(
await self.callback_server.wait_for('player_state'),
'stopped')
finally:
await self.audioproc_client_main.disconnect_ports(

@ -22,3 +22,4 @@ psutil
intervaltree
pkgconfig
pycapnp
py-cpuinfo

@ -0,0 +1,2 @@
"datetime","CPU brand","CPU speed","CPU cores","testname","#frames","mean","stddev","50.0th %tile","75.0th %tile","90.0th %tile","95.0th %tile","99.0th %tile","99.9th %tile"
"2017-08-02 22:19:14","AMD E2-3000M APU with Radeon(tm) HD Graphics","1.8000 GHz","2","playback_demo","297","24958.8114478","2201.20644257","24687.0","26225.0","27529.4","28141.0","30097.64","40285.448"
1 datetime CPU brand CPU speed CPU cores testname #frames mean stddev 50.0th %tile 75.0th %tile 90.0th %tile 95.0th %tile 99.0th %tile 99.9th %tile
2 2017-08-02 22:19:14 AMD E2-3000M APU with Radeon(tm) HD Graphics 1.8000 GHz 2 playback_demo 297 24958.8114478 2201.20644257 24687.0 26225.0 27529.4 28141.0 30097.64 40285.448
Loading…
Cancel
Save