# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2020 ifm electronic gmbh
#
# THE PROGRAM IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#
"""
This module provides a generic reader which can be inherited to use new data formats inside nexxT.
"""
import time
import logging
import math
from nexxT.Qt.QtCore import Signal, QTimer, Qt
from nexxT.Qt.QtWidgets import QFileDialog
from nexxT.interface import Filter, Services, DataSample
from nexxT.core.Utils import handleException, isMainThread, MethodInvoker
logger = logging.getLogger(__name__)
[docs]
class GenericReaderFile:
"""
Interface for adaptations of new file formats.
For supporting new file formats, inherit from this class and overwrite all of the methods listed here. The
constructor of the inherited class usually takes a filename argument. Inherit also from GenericReader to provide
a new Filter class and overwrite the methods getNameFilter and openFile, which returns an instance of the
GenericReaderFile implementation.
See :py:class:`nexxT.filters.hdf5.Hdf5File` and :py:class:`nexxT.filters.hdf5.Hdf5Reader` for an example.
"""
# this is an abstract class and the methods are provided for reference
[docs]
def close(self):
"""
Closes the file.
:return:
"""
raise NotImplementedError()
[docs]
def getNumberOfSamples(self, stream):
"""
Returns the number of samples in the given stream
:param stream: the name of the stream as a string
:return: the number of samples in the stream
"""
raise NotImplementedError()
[docs]
def getTimestampResolution(self):
"""
Returns the resolution of the timestamps in ticks per second.
:return: ticks per second as an integer
"""
raise NotImplementedError()
[docs]
def allStreams(self):
"""
Returns the streams in this file.
:return: a list of strings
"""
raise NotImplementedError()
[docs]
def readSample(self, stream, streamIdx):
"""
Returns the referenced sample as a tuple (content, dataType, dataTimestamp, rcvTimestamp).
:param stream: the stream
:param idx: the index of the sample in the stream
:return: (content: QByteArray, dataType: str, dataTimestamp: int, receiveTimestamp: int)
"""
raise NotImplementedError()
[docs]
def getRcvTimestamp(self, stream, streamIdx):
"""
Returns the recevie timestamp of the given (stream, streamIdx) sample. The default implementation uses
readSample(...). It may be replaced by a more efficient implementation.
:param stream: the name of the stream as a string
:param streamIdx: the stream index as an integer
:return: the timestamp as an integer (see also getTimestampResolution)
"""
return self.readSample(stream, streamIdx)[3]
[docs]
class GenericReader(Filter):
"""
Generic harddisk reader which can be used as base class for implementing readers for custom file formats. To create
a new input file reader, inherit from this class and reimplement getNameFilter(...) and openFile(...). openFile(...)
shall return an instance of an implementation of the interface GenericReaderFile.
See :py:class:`nexxT.filters.hdf5.Hdf5Reader` for an example.
"""
# signals for playback device
playbackStarted = Signal()
playbackPaused = Signal()
sequenceOpened = Signal(str, 'qint64', 'qint64', list)
currentTimestampChanged = Signal('qint64')
timeRatioChanged = Signal(float)
# methods to be overloaded
[docs]
def getNameFilter(self):
"""
Returns the name filter associated with the input files.
:return: a list of strings, e.g. ["*.h5", "*.hdf5"]
"""
raise NotImplementedError()
[docs]
def openFile(self, filename):
"""
Opens the given file and return an instance of GenericReaderFile.
:return: an instance of GenericReaderFile
"""
raise NotImplementedError()
# slots for playback device
[docs]
def startPlayback(self):
"""
slot called when the playback shall be started
:return:
"""
if not self._playing:
self._playing = True
self._timer.start(0)
self._updateTimer.start()
self.playbackStarted.emit()
[docs]
def pausePlayback(self):
"""
slot called when the playback shall be paused
:return:
"""
if self._playing:
self._playing = False
self._untilStream = None
self._dir = 1
self._timer.stop()
self._updateTimer.stop()
self._updateCurrentTimestamp()
self.playbackPaused.emit()
[docs]
def stepForward(self, stream):
"""
slot called to step one frame in stream forward
:param stream: a string instance or None (all streams are selected)
:return:
"""
self._untilStream = stream if stream is not None else ''
self.startPlayback()
[docs]
def stepBackward(self, stream):
"""
slot called to step one frame in stream backward
:param stream: a string instance or None (all streams are selected)
:return:
"""
self._dir = -1
self._untilStream = stream if stream is not None else ''
self.startPlayback()
[docs]
def seekBeginning(self):
"""
slot called to go to the beginning of the file
:return:
"""
self.pausePlayback()
for p in self._portToIdx:
self._portToIdx[p] = -1
self._transmitNextSample()
self._updateCurrentTimestamp()
[docs]
def seekEnd(self):
"""
slot called to go to the end of the file
:return:
"""
self.pausePlayback()
for p in self._portToIdx:
self._portToIdx[p] = self._file.getNumberOfSamples(p)
self._dir = -1
self._transmitNextSample()
self._dir = +1
self._updateCurrentTimestamp()
[docs]
def seekTime(self, timestamp):
"""
slot called to go to the specified time
:param timestamp: a timestamp in nanosecond resolution
:return:
"""
t = timestamp // (1000000000//self._file.getTimestampResolution())
nValid = 0
for p in self._portToIdx:
# binary search
minIdx = -1
num = self._file.getNumberOfSamples(p)
maxIdx = num
# binary search for timestamp
while maxIdx - minIdx > 1:
testIdx = max(0, min(num-1, (minIdx + maxIdx)//2))
vTest = self._file.getRcvTimestamp(p, testIdx)
if vTest <= t:
minIdx = testIdx
else:
maxIdx = testIdx
self._portToIdx[p] = minIdx
if minIdx >= 0:
# note: minIdx is always below num
nValid += 1
if nValid > 0:
self._transmitCurrent()
else:
self._transmitNextSample()
self._updateCurrentTimestamp()
[docs]
def setSequence(self, filename):
"""
slot called to set the sequence file name
:param filename: a string instance
:return:
"""
logger.debug("Set sequence filename=%s", filename)
self._name = filename
[docs]
def setTimeFactor(self, factor):
"""
slot called to set the time factor
:param factor: a float
:return:
"""
self._timeFactor = factor
self.timeRatioChanged.emit(self._timeFactor)
# overwrites from Filter
[docs]
def __init__(self, env):
super().__init__(False, True, env)
self._name = None
self._file = None
self._portToIdx = None
self._timer = None
self._updateTimer = QTimer(self)
self._updateTimer.setInterval(1000) # update new position each second
self._updateTimer.timeout.connect(self._updateCurrentTimestamp)
self._currentTimestamp = None
self._playing = None
self._untilStream = None
self._dir = 1
self._ports = None
self._timeFactor = 1
pc = self.propertyCollection()
pc.defineProperty("defaultStepStream", "<all>",
"define the default step stream (the user can override it via menu)")
[docs]
def onOpen(self):
"""
overloaded from Filter
:return:
"""
srv = Services.getService("PlaybackControl")
srv.setupConnections(self, self.getNameFilter())
if isMainThread():
logger.warning("This GenericReader seems to run in GUI thread. Consider to move it to a seperate thread.")
[docs]
def onStart(self):
"""
overloaded from Filter
:return:
"""
if self._name is not None:
self._file = self.openFile(self._name) # pylint: disable=assignment-from-no-return
if not isinstance(self._file, GenericReaderFile):
logger.error("Unexpected instance returned from openFile(...) method of instance %s", (repr(self)))
# sanity checks for the timestamp resolutions
# spit out some errors because when these checks fail, the timestamp logic doesn't work
# note that nexxT tries to avoid applying floating point arithmetics to timestamps due to possible loss
# of accuracy
tsResolution = self._file.getTimestampResolution()
f = DataSample.TIMESTAMP_RES*tsResolution
if (f > 1 and f % 1.0 != 0.0) or (f < 1 and (1/f) % 1.0 != 0.0):
logger.error("timestamp resolution of opened instance %s is no integer multiple of internal resolution",
repr(self._file))
if (1000000000/tsResolution) % 1.0 != 0.0:
logger.error("timestamp resolution of opened instance %s is no integer multiple of nanoseconds",
repr(self._file))
self._portToIdx = {}
self._ports = self.getDynamicOutputPorts()
for s in self._file.allStreams():
if s in [p.name() for p in self._ports]:
if self._file.getNumberOfSamples(s) > 0:
self._portToIdx[s] = -1
else:
logger.warning("Stream %s does not contain any samples.", s)
else:
logger.warning("No matching output port for stream %s. Consider to create a port for it.", s)
for p in self._ports:
if not p.name() in self._portToIdx:
logger.warning("No matching stream for output port %s. HDF5 file not matching the configuration?",
p.name())
self._timer = QTimer(parent=self)
self._timer.timeout.connect(self._transmitNextSample)
self._playing = False
self._currentTimestamp = None
span = self._timeSpan()
self.sequenceOpened.emit(self._name, span[0], span[1], sorted(self._portToIdx.keys()))
self.timeRatioChanged.emit(self._timeFactor)
self.playbackPaused.emit()
try:
srv = Services.getService("PlaybackControl")
except: # pylint: disable=bare-except
srv = None
pc = self.propertyCollection()
stepStream = pc.getProperty("defaultStepStream")
if stepStream not in self._portToIdx:
stepStream = None
if srv is not None and hasattr(srv, "setSelectedStream"):
MethodInvoker(srv.setSelectedStream, Qt.QueuedConnection, stepStream)
[docs]
def onStop(self):
"""
overloaded from Filter
:return:
"""
if self._file is not None:
self._file.close()
self._file = None
self._portToIdx = None
self._timer.stop()
self._timer = None
self._playing = None
self._currentTimestamp = None
[docs]
def onClose(self):
"""
overloaded from Filter
:return:
"""
srv = Services.getService("PlaybackControl")
srv.removeConnections(self)
[docs]
def onSuggestDynamicPorts(self):
"""
overloaded from Filter
:return:
"""
try:
fn, ok = QFileDialog.getOpenFileName(caption="Choose template hdf5 file",
filter=f"Support files ({' '.join(self.getNameFilter())})")
if ok:
f = self.openFile(fn) # pylint: disable=assignment-from-no-return
if not isinstance(f, GenericReaderFile):
logger.error("Unexpected instance returned from openFile(...) method of instance %s", (repr(self)))
return [], list(f.allStreams())
except Exception: # pylint: disable=broad-except
logger.exception("Caught exception during onSuggestDynamicPorts")
return [], []
# private slots and methods
def _timeSpan(self):
tmin = math.inf
tminNonZero = math.inf
tmax = -math.inf
for p in self._portToIdx:
for i in range(self._file.getNumberOfSamples(p)):
t = self._file.getRcvTimestamp(p, i)
tmin = min(t, tmin)
if t != 0:
tminNonZero = min(tminNonZero, t)
break
t = self._file.getRcvTimestamp(p, self._file.getNumberOfSamples(p)-1)
tmax = max(t, tmax)
if tmin > tmax:
raise RuntimeError("It seems that the input file doesn't have any usable samples.")
if tmin == 0 and tminNonZero > 60*24*self._file.getTimestampResolution():
tmin = tminNonZero
return (tmin*(1000000000//self._file.getTimestampResolution()),
tmax*(1000000000//self._file.getTimestampResolution()))
def _getNextSample(self):
# check which port has the next sample to deliver according to rcv timestamps
nextPort = None
for p in self._portToIdx:
idx = self._portToIdx[p]
idx = idx + self._dir
if 0 <= idx < self._file.getNumberOfSamples(p):
ts = self._file.getRcvTimestamp(p, idx)
# pylint: disable=unsubscriptable-object
# actually, nextPort can be either None or a 2-tuple
if nextPort is None or (ts < nextPort[0] and self._dir > 0) or (ts > nextPort[0] and self._dir < 0):
nextPort = (ts, p)
return nextPort
@handleException
def _transmitNextSample(self):
startTime = time.perf_counter_ns()
nextPort = self._getNextSample()
# when next data sample arrives sooner than this threshold, do not use the QTimer but perform busy waiting
noSleepThreshold_ns = 0.005*1e9 # 5 ms
# maximum time in busy-wait strategy (measured from the beginning of the function)
maxTimeInMethod = 0.05*1e9 # yield all 50 ms
factorTStoNS = 1e9/self._file.getTimestampResolution()
while nextPort is not None:
ts, pname = nextPort
self._portToIdx[pname] += self._dir
lastTransmit = self._transmit(pname)
if not self._playing:
return pname
nextPort = self._getNextSample()
if nextPort is not None:
newTs, _ = nextPort
nowTime = time.perf_counter_ns()
deltaT_ns = max(0, (newTs - ts) * factorTStoNS / self._timeFactor - (nowTime - lastTransmit))
if deltaT_ns < noSleepThreshold_ns and nowTime - startTime + deltaT_ns < maxTimeInMethod:
while time.perf_counter_ns() - nowTime < deltaT_ns:
pass
else:
if deltaT_ns < 10e9:
self._timer.start(deltaT_ns//1000000)
break
else:
self.pausePlayback()
def _transmit(self, pname):
idx = self._portToIdx[pname]
# read data sample from HDF5 file
content, dataType, dataTimestamp, rcvTimestamp = self._file.readSample(pname, idx)
# create sample to transmit
f = 1/(DataSample.TIMESTAMP_RES * self._file.getTimestampResolution())
if f >= 1:
f = round(f)
tsData = dataTimestamp * f
else:
f = round(1/f)
tsData = dataTimestamp // f
sample = DataSample(content, dataType, tsData)
res = time.perf_counter_ns()
# transmit sample over corresponding port
self._ports[[p.name() for p in self._ports].index(pname)].transmit(sample)
self._currentTimestampChanged(rcvTimestamp*(1000000000//self._file.getTimestampResolution()))
if self._untilStream is not None:
if self._untilStream in (pname, ''):
self.pausePlayback()
return res
def _transmitCurrent(self):
ports = list(self._portToIdx.keys())
values = [self._file.getRcvTimestamp(p, self._portToIdx[p]) for p in ports]
sortedIdx = sorted(range(len(values)), key=lambda x: values[x])
# transmit most recent sample
self._transmit(ports[sortedIdx[-1]])
def _currentTimestampChanged(self, timestamp):
self._currentTimestamp = timestamp
def _updateCurrentTimestamp(self):
if self._currentTimestamp is not None:
self.currentTimestampChanged.emit(self._currentTimestamp)