Source code for nexxT.services.gui.Profiling

# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2020 ifm electronic gmbh
#
# THE PROGRAM IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#

"""
This module provides the gui part of the profiling service for nexxT.
"""

import logging
import numpy as np
from nexxT.Qt.QtCore import QByteArray, Slot, Qt, QPointF, QLineF, QRectF, QEvent
from nexxT.Qt.QtGui import QPainter, QPolygonF, QPen, QColor, QFontMetricsF, QPalette, QAction
from nexxT.Qt.QtWidgets import QWidget, QToolTip
from nexxT.core.Utils import ThreadToColor
from nexxT.interface import Services
from nexxT.services.SrvProfiling import ProfilingService

logger = logging.getLogger(__name__)

[docs] class LoadDisplayWidget(QWidget): """ This widget displays the thread-specific load. """ baseTimestamp = None
[docs] def __init__(self, parent): super().__init__(parent=parent) self._loadData = {} self.setBackgroundRole(QPalette.Base) self.setAutoFillBackground(True)
[docs] @Slot(str, QByteArray) def newLoadData(self, threadName, timestamps, load): """ Slot called when new load data is available :param threadName: the name of the thread given as string :param loadData: the load data, given as the QByteArray of a n x 2 np.float32 array :return: """ atimestamps = np.frombuffer(memoryview(timestamps), dtype=np.int64) aload = np.frombuffer(memoryview(load), dtype=np.float32) if LoadDisplayWidget.baseTimestamp is None: LoadDisplayWidget.baseTimestamp = np.min(atimestamps) if threadName not in self._loadData: self._loadData[threadName] = QPolygonF() p = self._loadData[threadName] for i in range(aload.shape[0]): x = 1e-9*(atimestamps[i] - self.baseTimestamp) if p.size() > 0 and p.at(p.count()-1).x() > x: # it seems that QT re-orders slots :( we have to maintain the order here idx = p.count() while idx > 0 and p.at(idx-1).x() > x: idx -= 1 p.insert(idx, QPointF(x, aload[i])) else: p.append(QPointF(x, aload[i])) if p[p.count()-1].x() - p[0].x() > 60: for i in range(p.count()): if p[p.count()-1].x() - p[i].x() <= 60: p.remove(0, i) break self.update()
[docs] @Slot(str) def removeThread(self, thread): """ Remove the thread from the stored load data. :param thread: the name of the thread to be removed. :return: """ if thread in self._loadData: del self._loadData[thread]
[docs] def paintEvent(self, event): """ Manually implemented paint event :param event: the QT paint event :return: """ h = self.height() w = self.width() p = QPainter(self) p.setClipRect(event.region().boundingRect()) pen = QPen(QColor(0, 0, 0)) pen.setWidth(4) ls = QFontMetricsF(p.font()).lineSpacing() for idx, t in enumerate(sorted(list(self._loadData.keys()))): y = 10 + idx*ls pen.setColor(ThreadToColor.singleton.get(t)) p.setPen(pen) p.drawLine(QLineF(15, y, 15 + 15, y)) pen.setColor(QColor(0, 0, 0)) p.setPen(pen) p.drawText(QPointF(35, y), t) if len(self._loadData) > 0: right = max(polygon[polygon.count()-1].x() for _, polygon in self._loadData.items()) else: right = 0.0 p.translate(w-10-right*20, h-10) p.scale(20, -(h-20)) # x direction: 20 pixels per second, y direction: spread between 10 and h-10 topleft = p.transform().inverted()[0].map(QPointF(10, 10)) pen.setWidthF(0) pen.setCosmetic(True) left = topleft.x() p.setRenderHint(QPainter.Antialiasing, True) p.setPen(pen) p.drawLine(QLineF(left, 0, right, 0)) p.drawLine(QLineF(left, 0, left, 1)) idx = 0 for t, polygon in self._loadData.items(): pen.setColor(ThreadToColor.singleton.get(t)) p.setPen(pen) p.drawPolyline(polygon) p.end()
[docs] class SpanDisplayWidget(QWidget): """ This Widget displays the time/occupancy profiling overview based on the input ports events. """
[docs] def __init__(self, parent): super().__init__(parent=parent) #self.setAttribute(Qt.WA_Hover, True) self.setMouseTracking(True) self._spanData = {} self._removedThreads = set() self.setBackgroundRole(QPalette.Base) self.setAutoFillBackground(True) self.portYCoords = []
[docs] @Slot(str, str, QByteArray) def newSpanData(self, threadName, portName, spanData): """ This slot is called when new profiling data is available. :param threadName: the name of the associated thread :param portName: the full-qualified name of the port :param spanData: the profiling data, given as the byte array representation of a n x 2 int64 array. :return: """ if threadName in self._removedThreads: del self._spanData[threadName] self._removedThreads.remove(threadName) spanData = np.reshape(np.frombuffer(memoryview(spanData), dtype=np.int64), (-1, 2)) if threadName not in self._spanData: self._spanData[threadName] = {} if portName not in self._spanData[threadName]: self._spanData[threadName][portName] = np.zeros((0, 2), np.int64) self._spanData[threadName][portName] = np.append(self._spanData[threadName][portName], spanData, axis=0) sd = self._spanData[threadName][portName] if (sd[-1, -1] - sd[0, 0])*1e-9 > 60: for i in range(sd.shape[0]): if (sd[-1, -1] - sd[i, 0])*1e-9 <= 60: sd = sd[i:, :] self._spanData[threadName][portName] = sd break self.update()
[docs] @Slot(str) def removeThread(self, thread): """ Lazily removes the thread from the profiling data. To be able to inspect the data when the application is stopped, the data will actually be removed when new data of the thread is available. :param thread: the name of the thread to be removed. :return: """ if thread in self._spanData: self._removedThreads.add(thread)
[docs] def paintEvent(self, event): """ Manually implemented paint event of the time / occupancy diagram. :param event: the qt paint event :return: """ bgcolor = self.palette().color(self.backgroundRole()) h = self.height() w = self.width() p = QPainter(self) p.setClipRect(event.region().boundingRect()) pen = QPen(QColor(0, 0, 0)) pen.setWidth(0) pen.setCosmetic(True) ls = QFontMetricsF(p.font()).lineSpacing() maxx = 0 minx = None for t in sorted(list(self._spanData.keys())): for port in sorted(list(self._spanData[t].keys())): sd = self._spanData[t][port] maxx = np.maximum(maxx, np.max(sd)) minx = np.minimum(minx, np.min(sd)) if minx is not None else np.min(sd) scalex = 1e-9*200 # 200 pixels / second # (maxx-minx)*scalex + offx = w-10 if minx is None: return offx = w-10-(maxx-minx)*scalex idx = 0 self.portYCoords = [] for t in sorted(list(self._spanData.keys())): for port in sorted(list(self._spanData[t].keys())): pen.setColor(QColor(0, 0, 0)) p.setPen(pen) y = 10 + idx*ls self.portYCoords.append((t, port, y-ls/2, y)) idx += 1 sd = self._spanData[t][port] for i in range(sd.shape[0]): x1, x2 = sd[i, :] x1 = (x1-minx)*scalex + offx x2 = (x2-minx)*scalex + offx color = ThreadToColor.singleton.get(t) color.setAlpha(125) p.fillRect(QRectF(x1, y-ls/2, x2-x1, ls/2), color) p.drawRect(QRectF(x1, y-ls/2, x2-x1, ls/2)) pen = QPen(QColor(40, 40, 40)) pen.setWidth(0) pen.setCosmetic(True) pen.setStyle(Qt.DashLine) p.setPen(pen) for x in range(w-10, -1, -20): p.drawLine(x, 10, x, h-10) idx = 0 pen.setStyle(Qt.SolidLine) p.setPen(pen) for t in sorted(list(self._spanData.keys())): for port in sorted(list(self._spanData[t].keys())): y = 10 + idx*ls idx += 1 br = QFontMetricsF(p.font()).boundingRect(port) br.translate(10, y) p.fillRect(br, bgcolor) p.drawText(10, y, port) p.end()
[docs] def textDescription(self, thread, port): """ Tooltip text generation. :param thread: the name of the corresponding thread :param port: the full-qualified port name. :return: a string instance containing the profiling info. """ sd = self._spanData[thread][port] res = f"Thread: {thread}, Port: {port}\n" groups = [] activeGroup = {} for i in range(sd.shape[0]): if len(activeGroup) > 0: if sd[i, 1] <= activeGroup["finish"]: activeGroup["subcalls"].append(sd[i, :]) continue groups.append(activeGroup) activeGroup = dict(start=sd[i, 0], finish=sd[i, 1], subcalls=[]) for i, g in enumerate(groups[::-1]): total = (g["finish"] - g["start"])*1e-6 exclusive = sum(sc[1]-sc[0] for sc in g["subcalls"])*1e-6 subcalls = total - exclusive res += f" event[{-i-1}] Total runtime: {total:.1f} ms;" res += f" Exclusive time: {exclusive:.1f} ms; Subcall time: {subcalls:.1f} ms\n" if i >= 9: # show last 10 calls break return res
[docs] def event(self, event): """ Event filter for generating tool tips. :param event: a QEvent instance. :return: """ if event.type() == QEvent.ToolTip: for thread, port, y1, y2 in self.portYCoords: if event.pos().y() >= y1 and event.pos().y() <= y2: QToolTip.showText(event.globalPos(), self.textDescription(thread, port)) return True return super().event(event)
[docs] class Profiling(ProfilingService): """ GUI part of the nexxT profiling service. """
[docs] def __init__(self): super().__init__() srv = Services.getService("MainWindow") profMenu = srv.menuBar().addMenu("Pr&ofiling") self.loadDockWidget = srv.newDockWidget("Load", None, Qt.BottomDockWidgetArea) self.loadDisplay = LoadDisplayWidget(self.loadDockWidget) self.loadDockWidget.setWidget(self.loadDisplay) self.loadDataUpdated.connect(self.loadDisplay.newLoadData) self.threadDeregistered.connect(self.loadDisplay.removeThread) self.spanDockWidget = srv.newDockWidget("Profiling", None, Qt.BottomDockWidgetArea) self.spanDisplay = SpanDisplayWidget(self.spanDockWidget) self.spanDockWidget.setWidget(self.spanDisplay) self.spanDataUpdated.connect(self.spanDisplay.newSpanData) self.threadDeregistered.connect(self.spanDisplay.removeThread) self.actLoadEnabled = QAction("Enable Load Monitor") self.actLoadEnabled.setCheckable(True) self.actLoadEnabled.setChecked(True) self.actLoadEnabled.toggled.connect(self.setLoadMonitorEnabled) self.actProfEnabled = QAction("Enable Port Profiling") self.actProfEnabled.setCheckable(True) self.actProfEnabled.setChecked(False) self.actProfEnabled.toggled.connect(self.setPortProfilingEnabled) self.setLoadMonitorEnabled(True) self.setPortProfilingEnabled(False) profMenu.addAction(self.actLoadEnabled) profMenu.addAction(self.actProfEnabled)
[docs] def setLoadMonitorEnabled(self, enabled): """ called when the corresponding QAction is toggled :param enabled: boolean :return: """ self.actProfEnabled.setEnabled(enabled) super().setLoadMonitorEnabled(enabled)
[docs] def setPortProfilingEnabled(self, enabled): """ called when the corresponding QAction is toggled :param enabled: boolean :return: """ self.actLoadEnabled.setEnabled(not enabled) super().setPortProfilingEnabled(enabled)