Can stream audio from project's audioproc process to main audioproc process.

looper
Ben Niemann 7 years ago
parent a3afac176b
commit 1725db8075

@ -247,18 +247,22 @@ class AudioProcProcessMixin(object):
def handle_set_backend(self, session_id, name, args):
self.get_session(session_id)
result = None
if name == 'pyaudio':
be = backend.PyAudioBackend(**args)
elif name == 'null':
be = backend.NullBackend(**args)
elif name == 'ipc':
be = backend.IPCBackend(**args)
result = be.address
elif name is None:
be = None
else:
raise ValueError("Invalid backend name %s" % name)
self.pipeline.set_backend(be)
return result
async def handle_play_file(self, session_id, path):
self.get_session(session_id)

@ -1,9 +1,14 @@
#!/usr/bin/python3
import logging
import os
import os.path
import queue
import select
import tempfile
import threading
import time
import uuid
import pyaudio
@ -172,34 +177,84 @@ class PyAudioBackend(Backend):
class IPCBackend(Backend):
def __init__(self):
def __init__(self, socket_dir=None):
super().__init__()
self._input_ready = threading.Event()
self._output_ready = threading.Event()
if socket_dir is None:
socket_dir = tempfile.gettempdir()
af = audio_format.AudioFormat(
audio_format.CHANNELS_STEREO,
audio_format.SAMPLE_FMT_FLT,
44100)
self._output = frame.Frame(af, 0, set())
self.address = os.path.join(
socket_dir, 'audiostream.%s.pipe' % uuid.uuid4().hex)
def process_frame(self):
self._input_ready.set()
self._output_ready.wait()
self._output_ready.clear()
self._pipe_in = None
self._pipe_out = None
self._poller = None
def stop(self):
super().stop()
self._input_ready.set()
self._buffer = bytearray()
self._timepos = None
def setup(self):
super().setup()
os.mkfifo(self.address + '.send')
self._pipe_in = os.open(
self.address + '.send', os.O_RDONLY | os.O_NONBLOCK)
os.mkfifo(self.address + '.recv')
self._pipe_out = os.open(
self.address + '.recv', os.O_RDWR | os.O_NONBLOCK)
os.set_blocking(self._pipe_out, True)
self._poller = select.poll()
self._poller.register(self._pipe_in, select.POLLIN)
def cleanup(self):
if self._poller is not None:
self._poller.unregister(self._pipe_in)
self._poller = None
if self._pipe_in is not None:
os.close(self._pipe_in)
self._pipe_in = None
if self._pipe_out is not None:
os.close(self._pipe_out)
self._pipe_out = None
if os.path.exists(self.address + '.send'):
os.unlink(self.address + '.send')
if os.path.exists(self.address + '.recv'):
os.unlink(self.address + '.recv')
self._buffer.clear()
super().cleanup()
def wait(self):
if self.stopped:
return
self._input_ready.wait()
self._input_ready.clear()
while not self.stopped:
eol = self._buffer.find(b'\n')
if eol >= 0:
line = self._buffer[:eol]
del self._buffer[:eol+1]
assert line.startswith(b'#FR=')
self._timepos = int(line[4:])
return
if not self._poller.poll(0.5):
continue
dat = os.read(self._pipe_in, 1024)
self._buffer.extend(dat)
def write(self, frame):
self._output.resize(0)
self._output.append(frame)
self._output_ready.set()
samples = frame.as_bytes()
response = bytearray()
response.extend(b'#FR=%d\n' % self._timepos)
response.extend(b'SAMPLES=%d\n' % len(frame))
response.extend(b'LEN=%d\n' % len(samples))
response.extend(samples)
while response:
written = os.write(self._pipe_out, response)
del response[:written]

@ -212,7 +212,9 @@ cdef class Frame:
"Incompatible frame format: %s" % frame.audio_format)
if frame.length != self.length:
raise ValueError("Frame lengths must be identical")
raise ValueError(
"Frame lengths must be identical (%d vs. %d)"
% (frame.length, self.length))
if self.audio_format.sample_fmt == SAMPLE_FMT_U8:
self._add_u8(

@ -2,6 +2,7 @@
import asyncio
import logging
import os
import threading
from noisicaa.core import ipc
@ -28,30 +29,75 @@ class IPCNode(node.Node):
self._output = ports.AudioOutputPort('out')
self.add_output(self._output)
self._stub = None
self._pipe_in = None
self._pipe_out = None
self._buffer = bytearray()
async def setup(self):
await super().setup()
logger.info("setup(): thread_id=%s", threading.get_ident())
logger.info("Connecting to %s...", self._address)
self._stub = ipc.Stub(self.event_loop, self._address)
await self._stub.connect()
self._pipe_in = os.open(
self._address + '.recv', os.O_RDONLY | os.O_NONBLOCK)
os.set_blocking(self._pipe_in, True)
self._pipe_out = os.open(
self._address + '.send', os.O_RDWR | os.O_NONBLOCK)
os.set_blocking(self._pipe_out, True)
async def cleanup(self):
if self._stub is not None:
logger.info("Disconnecting from %s...", self._address)
await self._stub.close()
if self._pipe_in is not None:
os.close(self._pipe_in)
self._pipe_in = None
if self._pipe_out is not None:
os.close(self._pipe_out)
self._pipe_out = None
self._buffer.clear()
await super().cleanup()
def _get_line(self):
while True:
eol = self._buffer.find(b'\n')
if eol >= 0:
line = self._buffer[:eol]
del self._buffer[:eol+1]
return line
dat = os.read(self._pipe_in, 1024)
logger.debug("dat=%s", dat)
self._buffer.extend(dat)
def _get_bytes(self, num_bytes):
while len(self._buffer) < num_bytes:
dat = os.read(self._pipe_in, 1024)
logger.debug("dat=%s", dat)
self._buffer.extend(dat)
d = self._buffer[:num_bytes]
del self._buffer[:num_bytes]
return d
def run(self, timepos):
self._output.frame.clear()
os.write(self._pipe_out, b'#FR=%d\n' % timepos)
l = self._get_line()
assert l == b'#FR=%d' % timepos, l
l = self._get_line()
assert l.startswith(b'SAMPLES=')
num_samples = int(l[8:])
l = self._get_line()
assert l.startswith(b'LEN=')
num_bytes = int(l[4:])
logger.info("run(): thread_id=%s", threading.get_ident())
samples = self._get_bytes(num_bytes)
assert len(samples) == num_bytes
future = asyncio.run_coroutine_threadsafe(
self._stub.call('PROCESS_FRAME'), self.event_loop)
event = threading.Event()
future.add_done_callback(event.set)
event.wait()
logger.info("process_frame done")
self._output.frame.resize(0)
self._output.frame.append_samples(samples, num_samples)
assert len(self._output.frame) <= 4096
self._output.frame.resize(4096)

@ -149,8 +149,8 @@ class Pipeline(object):
t2 = time.time()
if t2 - t0 > 0:
utilization = (t2 - t1) / (t2 - t0)
if self.utilization_callback is not None:
self.utilization_callback(utilization)
# if self.utilization_callback is not None:
# self.utilization_callback(utilization)
timepos += 4096

@ -59,7 +59,7 @@ class ProjectClientMixin(object):
self._session_id = None
self._object_map = {}
self.project = None
self.audioproc_address = None
self.audiostream_address = None
self.cls_map = {}
def __set_project(self, root_id):
@ -78,7 +78,7 @@ class ProjectClientMixin(object):
assert self._stub is None
self._stub = ipc.Stub(self.event_loop, address)
await self._stub.connect()
self._session_id, self.audioproc_address, root_id = await self._stub.call(
self._session_id, self.audiostream_address, root_id = await self._stub.call(
'START_SESSION', self.server.address)
if root_id is not None:
# Connected to a loaded project.

@ -88,7 +88,11 @@ class ProjectProcessMixin(object):
self.event_loop, self.server)
await self.audioproc_client.setup()
await self.audioproc_client.connect(self.audioproc_address)
await self.audioproc_client.set_backend('ipc')
nid = await self.audioproc_client.add_node('wavfile', path='/usr/share/sounds/purple/logout.wav', loop=True)
await self.audioproc_client.connect_ports(nid, 'out', 'sink', 'in')
self.audiostream_address = await self.audioproc_client.set_backend('ipc')
async def cleanup(self):
if self.audioproc_client is not None:
@ -96,6 +100,7 @@ class ProjectProcessMixin(object):
await self.audioproc_client.cleanup()
self.audioproc_client = None
self.audioproc_address = None
self.audiostream_address = None
async def run(self):
await self._shutting_down.wait()
@ -159,8 +164,8 @@ class ProjectProcessMixin(object):
if self.project is not None:
for mutation in self.add_object_mutations(self.project):
await session.publish_mutation(mutation)
return session.id, self.audioproc_address, self.project.id
return session.id, self.audioproc_address, None
return session.id, self.audiostream_address, self.project.id
return session.id, self.audiostream_address, None
def handle_end_session(self, session_id):
session = self.get_session(session_id)

@ -199,13 +199,20 @@ class BaseEditorApp(QApplication):
async def addProject(self, project_connection):
self.win.addProjectView(project_connection)
node_id = await self.audioproc_client.add_node(
'ipc', address=project_connection.client.audioproc_address)
project_connection.playback_node = await self.audioproc_client.add_node(
'ipc', address=project_connection.client.audiostream_address)
await self.audioproc_client.connect_ports(
node_id, 'out', 'sink', 'in')
project_connection.playback_node, 'out', 'sink', 'in')
self._updateOpenedProjects()
async def removeProject(self, project_connection):
if project_connection.playback_node is not None:
await self.audioproc_client.disconnect_ports(
project_connection.playback_node, 'out', 'sink', 'in')
await self.audioproc_client.remove_node(
project_connection.playback_node)
project_connection.playback_node = None
self.win.removeProjectView(project_connection)
self._updateOpenedProjects()
await self.project_registry.close_project(project_connection)

@ -18,6 +18,8 @@ class Project(object):
self.process_address = None
self.client = None
self.playback_node = None
@property
def name(self):
return os.path.basename(self.path)

Loading…
Cancel
Save