# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2020 ifm electronic gmbh
#
# THE PROGRAM IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#
"""
This module provides the profiling service for nexxT, responsible for generating profiling measurements.
"""
import logging
import time
from threading import Lock
import numpy as np
from nexxT.Qt.QtCore import QObject, Signal, Slot, QThread, QTimer, Qt, QByteArray, QCoreApplication
from nexxT.core.Utils import MethodInvoker
logger = logging.getLogger(__name__)
TIMER = time.perf_counter_ns
[docs]
class PortProfiling:
"""
Simple helper class for storing profiling time points of a single port.
"""
[docs]
def __init__(self):
self.spans = []
self.currentItem = None
[docs]
def start(self, timeNs):
"""
Called when the corresponding item is started.
:param timeNs: the time point, given in nanoseconds.
:return:
"""
self.currentItem = [timeNs]
[docs]
def pause(self, timeNs):
"""
Called when the corresponding item is paused (another item may be started).
:param timeNs: the time point, given in nanoseconds.
:return:
"""
self.currentItem.append(timeNs)
[docs]
def unpause(self, timeNs):
"""
Called when the corresponding item is unpaused.
:param timeNs: the time point, given in nanoseconds.
:return:
"""
self.currentItem.append(timeNs)
[docs]
def stop(self, timeNs):
"""
Called when the corresponding item is finished. The profiling information will be added to the history.
:param timeNs: the time point, given in nanoseconds.
:return:
"""
self.currentItem.append(timeNs)
ci = self.currentItem
self.spans.append((ci[0], ci[-1]))
for i in range(0, len(ci), 2):
self.spans.append((ci[i], ci[i+1]))
self.currentItem = None
[docs]
def getSpans(self):
"""
Returns the profiling time points in a list.
:return: list of tuples containing nanosecond time points.
"""
res = self.spans
self.spans = []
return res
[docs]
class ThreadSpecificProfItem:
"""
This class contains all profiling items of a specific thread.
"""
THREAD_PROFILING_PERIOD_SEC = 0.3
THREAD_PROFILING_TOTAL_TIME = 60
[docs]
def __init__(self):
self._lastThreadTime = time.thread_time_ns()
self._lastMonotonicTime = TIMER()
self._portProfiling = {}
self._portStack = []
self._measurements = []
[docs]
def update(self):
"""
Updates the load profiling.
:return:
"""
thread_time = time.thread_time_ns()
monotonic_time = TIMER()
if monotonic_time == self._lastMonotonicTime:
return
load = (thread_time - self._lastThreadTime) / (monotonic_time - self._lastMonotonicTime)
self._lastThreadTime = thread_time
self._lastMonotonicTime = monotonic_time
self._measurements.append((monotonic_time, load))
[docs]
def getLoad(self):
"""
Returns the load measurements.
:return: list of 2-tuples (time_nano_seconds, load_ratio)
"""
res = self._measurements
self._measurements = []
return res
[docs]
def getSpans(self):
"""
Get the current port profiling data.
:return: dict mapping thread names to lists of tuples with nano-second time points.
"""
res = {}
for p, pp in self._portProfiling.items():
res[p] = pp.getSpans()
return res
[docs]
def registerPortChangeStarted(self, portname, timeNs):
"""
Called when starting the onPortDataChanged function.
:param portname: the full-qualified port name
:param timeNs: the time in nano-seconds
:return:
"""
if len(self._portStack) > 0:
self._portProfiling[self._portStack[-1]].pause(timeNs)
self._portStack.append(portname)
if not portname in self._portProfiling:
self._portProfiling[portname] = PortProfiling()
self._portProfiling[portname].start(timeNs)
[docs]
def registerPortChangeFinished(self, portname, timeNs):
"""
Called when the onPortDataChanged function has finished.
:param portname: the full-qualified port name
:param timeNs: the time in nano-seconds
:return:
"""
if len(self._portStack) == 0 or self._portStack[-1] != portname:
return # canceled during profiling
self._portStack = self._portStack[:-1]
self._portProfiling[portname].stop(timeNs)
if len(self._portStack) > 0:
self._portProfiling[self._portStack[-1]].unpause(timeNs)
[docs]
def cancel(self):
"""
Cancel profiling on user-request and reset the corresponding data.
:return:
"""
self._portProfiling = {}
self._portStack = []
[docs]
class ProfilingServiceDummy(QObject):
"""
This class can be used as a replacement for the ProfilingService which provides the same interface.
"""
[docs]
@Slot()
def registerThread(self):
"""
dummy implementation
"""
[docs]
@Slot()
def deregisterThread(self):
"""
dummy implementation
"""
[docs]
@Slot(str)
def beforePortDataChanged(self, portname):
"""
dummy implementation
:param portname: name of the port
"""
[docs]
@Slot(str)
def afterPortDataChanged(self, portname):
"""
dummy implementation
:param portname: name of the port
"""
[docs]
class ProfilingService(QObject):
"""
This class provides a profiling service for the nexxT framework.
"""
# this signal is emitted when there is new load data for a thread.
loadDataUpdated = Signal(str, QByteArray, QByteArray)
spanDataUpdated = Signal(str, str, QByteArray)
threadDeregistered = Signal(str)
stopTimers = Signal()
startTimers = Signal()
[docs]
def __init__(self):
super().__init__()
self._threadSpecificProfiling = {}
self._lockThreadSpecific = Lock()
self._lastEmitTime = TIMER()
self._loadMonitoringEnabled = True
self._portProfilingEnabled = False
self._mi = None
[docs]
@Slot()
def registerThread(self):
"""
This slot shall be called from each activated nexxT thread with a direct connection.
:return:
"""
t = QThread.currentThread()
logger.internal("registering thread %s", t.objectName())
with self._lockThreadSpecific:
if not t in self._threadSpecificProfiling:
self._threadSpecificProfiling[t] = ThreadSpecificProfItem()
self._threadSpecificProfiling[t].timer = QTimer(parent=self.sender())
self._threadSpecificProfiling[t].timer.timeout.connect(self._generateRecord, Qt.DirectConnection)
self._threadSpecificProfiling[t].timer.setInterval(
int(ThreadSpecificProfItem.THREAD_PROFILING_PERIOD_SEC*1e3))
self.stopTimers.connect(self._threadSpecificProfiling[t].timer.stop)
self.startTimers.connect(self._threadSpecificProfiling[t].timer.start)
if self._loadMonitoringEnabled:
self._threadSpecificProfiling[t].timer.start()
tmain = QCoreApplication.instance().thread()
if self._mi is None and not tmain in self._threadSpecificProfiling:
self._mi = MethodInvoker(dict(object=self, method="registerThread", thread=tmain),
Qt.QueuedConnection)
[docs]
def setLoadMonitorEnabled(self, enabled):
"""
Enables / disables load monitoring
:param enabled: boolean
:return:
"""
if enabled != self._loadMonitoringEnabled:
self._loadMonitoringEnabled = enabled
if enabled:
self.startTimers.emit()
else:
self.stopTimers.emit()
if self._portProfilingEnabled and not self._loadMonitoringEnabled:
logger.warning("Port profiling works only if load monitoring is enabled.")
[docs]
def setPortProfilingEnabled(self, enabled):
"""
Enables / disables port profiling
:param enabled: boolean
:return:
"""
if enabled != self._portProfilingEnabled:
self._portProfilingEnabled = enabled
if self._portProfilingEnabled and not self._loadMonitoringEnabled:
logger.warning("Port profiling works only if load monitoring is enabled.")
[docs]
@Slot()
def deregisterThread(self):
"""
This slot shall be called from each deactivated nexxT thread with a direct connection
:return:
"""
self._mi = None
t = QThread.currentThread()
logger.debug("deregistering thread %s", t.objectName())
todel = []
with self._lockThreadSpecific:
if t in self._threadSpecificProfiling:
self._threadSpecificProfiling[t].timer.stop()
todel.append(self._threadSpecificProfiling[t])
del self._threadSpecificProfiling[t]
del todel
self.threadDeregistered.emit(t.objectName())
@Slot()
def _generateRecord(self):
"""
This slot is automaticall called periodically
:return:
"""
t = QThread.currentThread()
with self._lockThreadSpecific:
self._threadSpecificProfiling[t].update()
self._emitData()
[docs]
@Slot(str)
def beforePortDataChanged(self, portname):
"""
This slot is called before calling onPortDataChanged.
:param portname: the fully qualified name of the port
:param timeNs: the timestamp
:return:
"""
if not self._portProfilingEnabled:
return
t = QThread.currentThread()
timeNs = time.perf_counter_ns()
with self._lockThreadSpecific:
if t in self._threadSpecificProfiling:
self._threadSpecificProfiling[t].registerPortChangeStarted(portname, timeNs)
[docs]
@Slot(str)
def afterPortDataChanged(self, portname):
"""
This slot is called after calling onPortDataChanged.
:param portname: the fully qualified name of the port
:param timeNs: the timestamp
:return:
"""
if not self._portProfilingEnabled:
return
t = QThread.currentThread()
timeNs = time.perf_counter_ns()
with self._lockThreadSpecific:
if t in self._threadSpecificProfiling:
self._threadSpecificProfiling[t].registerPortChangeFinished(portname, timeNs)
def _emitData(self):
t = TIMER()
if t - self._lastEmitTime > 1e8:
# emit each 100 ms
self._lastEmitTime = t
for t, tsp in self._threadSpecificProfiling.items():
load = tsp.getLoad()
atimstamps = np.array([l[0] for l in load], dtype=np.int64)
aload = np.array([l[1] for l in load], dtype=np.float32)
if aload.size > 0:
self.loadDataUpdated.emit(t.objectName(), QByteArray(atimstamps.tobytes()),
QByteArray(aload.tobytes()))
port_spans = tsp.getSpans()
for port, spans in port_spans.items():
spans = np.array(spans, dtype=np.int64)
if spans.size > 0:
self.spanDataUpdated.emit(t.objectName(), port, QByteArray(spans.tobytes()))