Browse Source

Basics of stats plotting.

looper
Ben Niemann 6 years ago
parent
commit
235be26b0e
  1. 33
      noisicaa/core/process_manager.py
  2. 136
      noisicaa/core/stats.py
  3. 14
      noisicaa/editor_main.py
  4. 9
      noisicaa/ui/editor_app.py
  5. 13
      noisicaa/ui/editor_window.py
  6. 196
      noisicaa/ui/stat_monitor.py
  7. 1
      requirements.txt

33
noisicaa/core/process_manager.py

@ -203,9 +203,8 @@ class ChildConnection(object):
class ChildCollector(object):
def __init__(self, stats_collector, stats_registry, collection_interval=100):
def __init__(self, stats_collector, collection_interval=100):
self.__stats_collector = stats_collector
self.__stats_registry = stats_registry
self.__collection_interval = collection_interval
self.__stat_poll_duration = None
@ -217,9 +216,9 @@ class ChildCollector(object):
self.__thread = None
def setup(self):
self.__stat_poll_duration = self.__stats_registry.register(
self.__stat_poll_duration = stats.registry.register(
stats.Counter, stats.StatName(name='stat_collector_duration_total'))
self.__stat_poll_count = self.__stats_registry.register(
self.__stat_poll_count = stats.registry.register(
stats.Counter, stats.StatName(name='stat_collector_collections'))
self.__stop = threading.Event()
@ -290,8 +289,10 @@ class ChildCollector(object):
self.__stat_poll_duration.incr(poll_duration)
self.__stat_poll_count.incr(1)
self.__stats_collector.collect(self.__stats_registry)
self.__stats_collector.evaluate_rules()
manager_name = stats.StatName(pid=os.getpid())
for name, value in stats.registry.collect():
self.__stats_collector.add_value(
name.merge(manager_name), value)
def __main(self):
next_collection = time.perf_counter()
@ -311,10 +312,9 @@ class ProcessManager(object):
self._sigchld_received = asyncio.Event()
self._server = ipc.Server(event_loop, 'manager')
self._stats_registry = stats.Registry()
self._stats_collector = stats.Collector()
self._child_collector = ChildCollector(
self._stats_collector, self._stats_registry)
self._stats_collector)
@property
def server(self):
@ -325,6 +325,11 @@ class ProcessManager(object):
self._event_loop.add_signal_handler(
signal.SIGCHLD, self.sigchld_handler)
self._server.add_command_handler(
'STATS_LIST', self.handle_stats_list)
self._server.add_command_handler(
'STATS_FETCH', self.handle_stats_fetch)
await self._server.setup()
self._child_collector.setup()
@ -334,6 +339,9 @@ class ProcessManager(object):
await self.terminate_all_children()
await self._server.cleanup()
self._server.remove_command_handler('STATS_LIST')
self._server.remove_command_handler('STATS_FETCH')
async def __aenter__(self):
await self.setup()
return self
@ -387,6 +395,9 @@ class ProcessManager(object):
for sig in signal.Signals:
self._event_loop.remove_signal_handler(sig)
# Clear all stats inherited from the manager process.
stats.registry.clear()
# Close the "other ends" of the pipes.
os.close(request_out)
os.close(response_in)
@ -537,6 +548,12 @@ class ProcessManager(object):
self._child_collector.remove_child(pid)
del self._processes[pid]
def handle_stats_list(self):
return self._stats_collector.list_stats()
def handle_stats_fetch(self, expressions):
return self._stats_collector.fetch_stats(expressions)
class ChildConnectionHandler(object):
def __init__(self, connection):

136
noisicaa/core/stats.py

@ -7,6 +7,8 @@ import threading
import time
import pprint
import psutil
logger = logging.getLogger(__name__)
@ -85,25 +87,9 @@ class Counter(BaseStat):
self.__value += amount
class Rule(object):
def __init__(self, name, func):
self.name = name
self.__func = func
def __str__(self):
return '%s(%s)' % (self.__class__.__name__, self.name)
__repr__ = __str__
@property
def key(self):
return self.name.key
def evaluate(self, tsdata):
return self.__func(tsdata)
class Value(object):
def __init__(self, timestamp, value):
assert isinstance(value, (int, float)), type(value)
self.timestamp = timestamp
self.value = value
@ -133,17 +119,30 @@ class Timeseries(collections.UserList):
def __init__(self, values=None):
super().__init__(values)
def rate(self):
result = Timeseries()
prev_value = None
for value in reversed(self):
if prev_value is not None:
result.insert(0, Value(value.timestamp, (value.value - prev_value.value) / (value.timestamp - prev_value.timestamp)))
prev_value = value
return result
def latest(self):
return self.data[0]
def max(self):
return max(value.value for value in self)
def min(self):
return min(value.value for value in self)
class TimeseriesSet(collections.UserDict):
def __init__(self, data=None):
super().__init__(data)
def select(self, **labels):
name = StatName(**labels)
def select(self, name):
result = TimeseriesSet()
for ts_name, ts in self.data.items():
if name.is_subset_of(ts_name):
@ -151,6 +150,13 @@ class TimeseriesSet(collections.UserDict):
return result
def rate(self):
result = TimeseriesSet()
for ts_name, ts in self.data.items():
result[ts_name] = ts.rate()
return result
def latest(self):
result = ValueSet()
for ts_name, ts in self.data.items():
@ -158,6 +164,12 @@ class TimeseriesSet(collections.UserDict):
return result
def min(self):
return min(ts.min() for ts in self.values())
def max(self):
return min(ts.max() for ts in self.values())
class Registry(object):
def __init__(self):
@ -175,20 +187,66 @@ class Registry(object):
with self.__lock:
del self.__stats[stat.name]
def clear(self):
with self.__lock:
for stat in list(self.__stats.values()):
stat.unregister()
assert not self.__stats
def collect(self):
data = []
proc_info = psutil.Process()
with self.__lock:
now = time.time()
for name, stat in self.__stats.items():
data.append((name, Value(now, stat.value)))
with proc_info.oneshot():
cpu_times = proc_info.cpu_times()
data.append((
StatName(name='cpu_time', type='user'),
Value(now, cpu_times.user)))
data.append((
StatName(name='cpu_time', type='system'),
Value(now, cpu_times.system)))
memory_info = proc_info.memory_info()
data.append((
StatName(name='memory', type='rss'),
Value(now, memory_info.rss)))
data.append((
StatName(name='memory', type='vms'),
Value(now, memory_info.vms)))
io_counters = proc_info.io_counters()
data.append((
StatName(name='io', type='read_count'),
Value(now, io_counters.read_count)))
data.append((
StatName(name='io', type='write_count'),
Value(now, io_counters.write_count)))
data.append((
StatName(name='io', type='read_bytes'),
Value(now, io_counters.read_bytes)))
data.append((
StatName(name='io', type='write_bytes'),
Value(now, io_counters.write_bytes)))
ctx_switches = proc_info.num_ctx_switches()
data.append((
StatName(name='ctx_switches', type='voluntary'),
Value(now, ctx_switches.voluntary)))
data.append((
StatName(name='ctx_switches', type='involuntary'),
Value(now, ctx_switches.involuntary)))
return data
class Collector(object):
def __init__(self, timeseries_length=60*10*10):
self.__timeseries = TimeseriesSet()
self.__rules = collections.OrderedDict()
self.__timeseries_length = timeseries_length
def add_value(self, name, value):
@ -199,32 +257,30 @@ class Collector(object):
if drop_count > 0:
del ts[-drop_count:]
def add_rule(self, rule):
with self.__lock:
assert rule.name not in self.__rules
self.__rules[rule.name] = rule
def collect(self, registry):
for name, value in registry.collect():
self.add_value(name, value)
def evaluate_rules(self):
now = time.time()
for rule in self.__rules.values():
value = rule.evaluate(self.__timeseries)
if isinstance(value, (Timeseries, TimeseriesSet)):
value = value.latest()
def evaluate_expression(self, expr):
result = self.__timeseries
if isinstance(value, Value):
value = value.value
for op, *args in expr:
if op == 'SELECT':
result = result.select(args[0])
elif op == 'RATE':
result = result.rate()
else:
raise ValueError(op)
return result
if isinstance(value, ValueSet):
for name, value in value.items():
rname = name.merge(rule.name)
self.add_value(rname, Value(now, value.value))
def list_stats(self):
return list(sorted(self.__timeseries.keys()))
elif isinstance(value, (int, float)):
self.add_value(rule.name, Value(now, value))
def fetch_stats(self, expressions):
return {
id: self.evaluate_expression(expr)
for id, expr in expressions.items()}
def dump(self):
for name, ts in sorted(self.__timeseries.items()):

14
noisicaa/editor_main.py

@ -12,13 +12,13 @@ from .runtime_settings import RuntimeSettings
from . import logging
from .core import process_manager
# Unload all noisicaa modules, so that every subprocess reloads everything
# from scratch again.
noisicaa_modules = [
mod for mod in sys.modules.keys()
if mod == 'noisicaa' or mod.startswith('noisicaa.')]
for mod in noisicaa_modules:
del sys.modules[mod]
# # Unload all noisicaa modules, so that every subprocess reloads everything
# # from scratch again.
# noisicaa_modules = [
# mod for mod in sys.modules.keys()
# if mod == 'noisicaa' or mod.startswith('noisicaa.')]
# for mod in noisicaa_modules:
# del sys.modules[mod]
class Main(object):

9
noisicaa/ui/editor_app.py

@ -20,6 +20,7 @@ from .editor_window import EditorWindow
from . import project_registry
from . import pipeline_perf_monitor
from . import pipeline_graph_monitor
from . import stat_monitor
logger = logging.getLogger('ui.editor_app')
@ -279,6 +280,7 @@ class EditorApp(BaseEditorApp):
self.win = None
self.pipeline_perf_monitor = None
self.pipeline_graph_monitor = None
self.stat_monitor = None
async def setup(self):
logger.info("Installing custom excepthook.")
@ -293,6 +295,9 @@ class EditorApp(BaseEditorApp):
# logger.info("Creating PipelineGraphMonitor.")
# self.pipeline_graph_monitor = pipeline_graph_monitor.PipelineGraphMonitor(self)
logger.info("Creating StatMonitor.")
self.stat_monitor = stat_monitor.StatMonitor(self)
logger.info("Creating EditorWindow.")
self.win = EditorWindow(self)
await self.win.setup()
@ -328,6 +333,10 @@ class EditorApp(BaseEditorApp):
self.dumpSettings()
async def cleanup(self):
if self.stat_monitor is not None:
self.stat_monitor.storeState()
self.stat_monitor = None
if self.pipeline_perf_monitor is not None:
self.pipeline_perf_monitor.storeState()
self.pipeline_perf_monitor = None

13
noisicaa/ui/editor_window.py

@ -262,6 +262,15 @@ class EditorWindow(ui_base.CommonMixin, QtWidgets.QMainWindow):
self.app.pipeline_graph_monitor.visibilityChanged.connect(
self._show_pipeline_graph_monitor_action.setChecked)
self._show_stat_monitor_action = QtWidgets.QAction(
"Stat Monitor", self,
checkable=True,
checked=self.app.stat_monitor.isVisible())
self._show_stat_monitor_action.toggled.connect(
self.app.stat_monitor.setVisible)
self.app.stat_monitor.visibilityChanged.connect(
self._show_stat_monitor_action.setChecked)
def createMenus(self):
menu_bar = self.menuBar()
@ -297,11 +306,11 @@ class EditorWindow(ui_base.CommonMixin, QtWidgets.QMainWindow):
self._dev_menu.addAction(self._restart_clean_action)
self._dev_menu.addAction(self._crash_action)
self._dev_menu.addAction(self.app.show_edit_areas_action)
self._dev_menu.addAction(
self._show_pipeline_perf_monitor_action)
self._dev_menu.addAction(self._show_pipeline_perf_monitor_action)
if self.app.pipeline_graph_monitor is not None:
self._dev_menu.addAction(
self._show_pipeline_graph_monitor_action)
self._dev_menu.addAction(self._show_stat_monitor_action)
menu_bar.addSeparator()

196
noisicaa/ui/stat_monitor.py

@ -0,0 +1,196 @@
#!/usr/bin/python
import functools
import math
import time
import uuid
from PyQt5.QtCore import Qt
from PyQt5 import QtCore
from PyQt5 import QtGui
from PyQt5 import QtWidgets
from . import ui_base
class StatGraph(QtWidgets.QWidget):
def __init__(self, *, parent, name):
super().__init__(parent)
self.setMinimumHeight(200)
self.setMaximumHeight(200)
self.__name = name
self.__id = uuid.uuid4().hex
self.__timeseries_set = None
def name(self):
return self.__name
def id(self):
return self.__id
def expression(self):
return [
('SELECT', self.__name),
('RATE',)]
def setTimeseriesSet(self, ts_set):
self.__timeseries_set = ts_set
self.update()
def paintEvent(self, evt):
super().paintEvent(evt)
painter = QtGui.QPainter(self)
painter.fillRect(0, 0, self.width(), self.height(), Qt.black)
painter.setPen(Qt.white)
painter.drawText(5, 16, str(self.__name))
if self.__timeseries_set is not None:
vmin = self.__timeseries_set.min()
vmax = self.__timeseries_set.max()
if vmax == vmin:
vmin = vmin - 1
vmax = vmax + 1
for name, ts in self.__timeseries_set.items():
px, py = None, None
for idx, value in enumerate(ts):
x = self.width() - idx - 1
y = int((self.height() - 1) * (vmax - value.value) / (vmax - vmin))
if px is not None:
painter.drawLine(px, py, x, y)
px, py = x, y
painter.end()
class StatMonitor(ui_base.CommonMixin, QtWidgets.QMainWindow):
visibilityChanged = QtCore.pyqtSignal(bool)
def __init__(self, app):
super().__init__(app=app)
self.__update_timer = QtCore.QTimer(self)
self.__update_timer.setInterval(1000)
self.__update_timer.timeout.connect(self.onUpdate)
self.__realtime = True
self.__time_scale = 4096
self.setWindowTitle("noisicaä - Stat Monitor")
self.resize(600, 300)
self.__pause_action = QtWidgets.QAction(
QtGui.QIcon.fromTheme('media-playback-pause'),
"Play",
self, triggered=self.onToggleRealtime)
self.__zoom_in_action = QtWidgets.QAction(
QtGui.QIcon.fromTheme('zoom-in'),
"Zoom In",
self, triggered=self.onZoomIn)
self.__zoom_out_action = QtWidgets.QAction(
QtGui.QIcon.fromTheme('zoom-out'),
"Zoom Out",
self, triggered=self.onZoomOut)
self.__stats_menu = QtWidgets.QMenu()
self.__stats_menu.aboutToShow.connect(self.onUpdateStatsMenu)
self.__add_stat_action = QtWidgets.QAction(
QtGui.QIcon.fromTheme('list-add'),
"Add stat",
self)
self.__add_stat_action.setMenu(self.__stats_menu)
self.__toolbar = QtWidgets.QToolBar()
self.__toolbar.addAction(self.__pause_action)
self.__toolbar.addAction(self.__zoom_in_action)
self.__toolbar.addAction(self.__zoom_out_action)
self.__toolbar.addAction(self.__add_stat_action)
self.addToolBar(Qt.TopToolBarArea, self.__toolbar)
self.__stat_graphs = []
self.__stat_list_layout = QtWidgets.QVBoxLayout()
self.__stat_list_layout.setSpacing(4)
self.__stat_list = QtWidgets.QWidget(self)
self.__stat_list.setLayout(self.__stat_list_layout)
self.__scroll_area = QtWidgets.QScrollArea(self)
self.__scroll_area.setWidget(self.__stat_list)
self.__scroll_area.setWidgetResizable(True)
self.__scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setCentralWidget(self.__scroll_area)
self.setVisible(
int(self.app.settings.value(
'dialog/stat_monitor/visible', False)))
self.restoreGeometry(
self.app.settings.value(
'dialog/stat_monitor/geometry', b''))
def storeState(self):
s = self.app.settings
s.beginGroup('dialog/stat_monitor')
s.setValue('visible', int(self.isVisible()))
s.setValue('geometry', self.saveGeometry())
s.endGroup()
def showEvent(self, event):
self.visibilityChanged.emit(True)
self.__update_timer.start()
super().showEvent(event)
def hideEvent(self, event):
self.__update_timer.stop()
self.visibilityChanged.emit(False)
super().hideEvent(event)
def onToggleRealtime(self):
if self.__realtime:
self.__realtime = False
self.__pause_action.setIcon(
QtGui.QIcon.fromTheme('media-playback-start'))
else:
self.__realtime = True
self.__pause_action.setIcon(
QtGui.QIcon.fromTheme('media-playback-pause'))
def onZoomIn(self):
self.__time_scale *= 2
def onZoomOut(self):
if self.time_scale > 1:
self.__time_scale //= 2
def onUpdate(self):
expressions = {
graph.id(): graph.expression()
for graph in self.__stat_graphs}
self.call_async(
self.app.process.manager.call('STATS_FETCH', expressions),
callback=self.onStatsFetched)
def onStatsFetched(self, result):
for graph in self.__stat_graphs:
graph.setTimeseriesSet(result.get(graph.id(), None))
def onUpdateStatsMenu(self):
self.__stats_menu.clear()
self.call_async(self.updateStatsMenuAsync())
async def updateStatsMenuAsync(self):
stat_list = await self.app.process.manager.call(
'STATS_LIST')
for name in stat_list:
action = self.__stats_menu.addAction(str(name))
action.triggered.connect(functools.partial(self.onAddStat, name))
def onAddStat(self, name):
graph = StatGraph(parent=self.__stat_list, name=name)
self.__stat_graphs.append(graph)
self.__stat_list_layout.addWidget(graph)

1
requirements.txt

@ -16,3 +16,4 @@ quamash
./3rdparty/csound/
posix_ipc
eventfd
psutil

Loading…
Cancel
Save