Source code for nexxT.filters.hdf5

# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2020 ifm electronic gmbh
#
# THE PROGRAM IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#

"""
This module provides a generic disk reader and writer based on HDF5.
To use it, you have to enable the "HDF5" feature during installation, i.e. pip install nexxT[HDF5]
"""
import datetime
from pathlib import Path
import string
import time
import logging
import os
import numpy as np
import h5py
from nexxT.Qt.QtCore import Signal
from nexxT.interface import Filter, Services
from nexxT.core.Utils import handleException, isMainThread
from nexxT.filters.GenericReader import GenericReader, GenericReaderFile

logger = logging.getLogger(__name__)

[docs] class Hdf5Writer(Filter): """ Generic nexxT filter for writing HDF5 files. """ statusUpdate = Signal(str, float, "qlonglong")
[docs] def __init__(self, env): super().__init__(True, False, env) self._currentFile = None self._lastDataTimestamp = None self._lastRcvTimestamp = None self._name = None self._basetime = None self.propertyCollection().defineProperty( "filename", "${DATE}_${TIME}_${FILTER_NAME}.h5", "Template for generated files. The following variables can be used: ${DATE}, ${TIME}, ${FILTER_NAME}") self._useRcvTimestamps = self.propertyCollection().defineProperty( "use_receive_timestamps", True, "Flag whether or not to use receive timestamps, so that the playback timing is approximately equal to the " "recording" ) self.propertyCollection().defineProperty( "silent_overwrite", False, "Whether or not silently overwrite existing files" ) self.propertyCollection().defineProperty( "buffer_period", 1.0, "The minimum buffer period in seconds. Pass 0.0 to disable buffering based on time.\n" "Note that high numbers might require a lot of memory.", options=dict(min=0.0, max=3600.0) ) self.propertyCollection().defineProperty( "buffer_samples", 0, "The minimum number of samples to buffer. Pass 0 to disable buffering based on samples.\n" "Note that high numbers might require a lot of memory.", options=dict(min=0, max=1000000) ) self.propertyCollection().defineProperty( "use_posix_fadvise_if_available", True, "If available, hint the kernel with posix_fadvise(..., POSIX_FADV_DONTNEED). Might give\n" "better write performance on linux systems, because there are no bursts of written data.\n" "Note: You can also try to use echo 0 > /proc/sys/vm/dirty_writeback_centisecs to disable\n" "write caching." ) self.propertyCollection().propertyChanged.connect(self._propertyChanged) # create a numpy-style dtype for the contents of a datasample type_content = h5py.vlen_dtype(np.dtype(np.uint8)) type_timestamp = np.int64 type_dataType = h5py.string_dtype() self.dtype = [('content', type_content), ('dataType', type_dataType), ('dataTimestamp', type_timestamp), ('rcvTimestamp', type_timestamp), ]
[docs] def onInit(self): for p in self.getDynamicInputPorts(): p.setInterthreadDynamicQueue(True)
[docs] def onStart(self): """ Registers itself to the recording control service :return: """ self._propertyChanged(self.propertyCollection(), "buffer_samples") srv = Services.getService("RecordingControl") srv.setupConnections(self) if isMainThread(): logger.warning("Hdf5Writer seems to run in GUI thread. Consider to move it to a seperate thread.")
def _propertyChanged(self, propColl, name): if name in ["buffer_samples", "buffer_period"]: qss = propColl.getProperty("buffer_samples") qsp = propColl.getProperty("buffer_period") for p in self.getDynamicInputPorts(): p.setQueueSize(qss, qsp)
[docs] def onStop(self): """ De-registers itself from the recording control service :return: """ srv = Services.getService("RecordingControl") srv.removeConnections(self)
@handleException def _startRecording(self, directory): # reset the current file self._currentFile = None self._name = self.propertyCollection().getProperty("filename") self._useRcvTimestamps = self.propertyCollection().getProperty("use_receive_timestamps") # interpolate the name with optionally given variables dt = datetime.datetime.now() variables = dict(DATE=dt.date().strftime('%Y%m%d'), TIME=dt.time().strftime('%H%M%S'), FILTER_NAME=self.propertyCollection().objectName()) self._name = string.Template(self._name).safe_substitute(variables) if not (self._name.endswith(".h5") or self._name.endswith(".hdf5") or self._name.endswith(".hdf")): self._name += ".h5" mode = "w" if self.propertyCollection().getProperty("silent_overwrite") else "x" # create a new HDF5 file / truncate an existing file containing a stream for all existing input ports self._currentFile = h5py.File(Path(directory) / self._name, mode=mode) streams = self._currentFile.create_group("streams") for port in self.getDynamicInputPorts(): streams.create_dataset(port.name(), (0,), chunks=(1,), maxshape=(None,), dtype=self.dtype) # setup variables needed during processing self._basetime = time.perf_counter_ns() # initial status update self.statusUpdate.emit(self._name, 0.0, 0)
[docs] def startRecording(self, directory): """ Called on a recording start event. :param directory: the directory where the recording is expected to be created. :return: """ self._startRecording(directory)
@handleException def _stopRecording(self): if self._currentFile is not None: # final status update self.statusUpdate.emit(self._name, -1, -1) # close the file self._currentFile.close() self._currentFile = None
[docs] def stopRecording(self): """ Called on a recording stop event. :param directory: the directory where the recording is expected to be created. :return: """ self._stopRecording()
[docs] def onPortDataChanged(self, port): """ Called when new data arrives at a port. :param port: the port where the new data is available. :return: """ if self._currentFile is None: # recording not active -> do nothing return s = self._currentFile["streams"][port.name()] sample = port.getData() # perform timestamp calculations if s.shape[0] > 0: lastDataTimestamp = self._lastDataTimestamp lastRcvTimestamp = self._lastRcvTimestamp else: lastDataTimestamp = sample.getTimestamp() lastRcvTimestamp = 0 if self._useRcvTimestamps: rcvTimestamp = np.int64(time.perf_counter_ns() - self._basetime)/1000 else: rcvTimestamp = max(1, sample.getTimestamp() - lastDataTimestamp) self._lastDataTimestamp = np.int64(sample.getTimestamp()) self._lastRcvTimestamp = rcvTimestamp # append the new data to the existing HDF5 dataset s.resize((s.shape[0]+1,)) s[-1:] = (np.frombuffer(sample.getContent(), dtype=np.uint8), sample.getDatatype(), np.int64(sample.getTimestamp()), rcvTimestamp) self._currentFile.flush() # status update once each second if (rcvTimestamp // 1000000) != (lastRcvTimestamp // 1000000): if hasattr(os, "posix_fadvise") and self.propertyCollection().getProperty("use_posix_fadvise_if_available"): # pylint: disable=no-member os.posix_fadvise(self._currentFile.id.get_vfd_handle(), 0, self._currentFile.id.get_filesize(), os.POSIX_FADV_DONTNEED) self.statusUpdate.emit(self._name, rcvTimestamp*1e-6, self._currentFile.id.get_filesize())
[docs] class Hdf5File(GenericReaderFile): """ Adaptation of hdf5 file format """
[docs] def __init__(self, filename): self._file = h5py.File(filename, "r")
[docs] def close(self): """ Closes the file. :return: """ self._file.close()
[docs] def getNumberOfSamples(self, stream): """ Returns the number of samples in the given stream :param stream: the name of the stream as a string :return: the number of samples in the stream """ return len(self._file["streams"][stream])
[docs] def getTimestampResolution(self): """ Returns the resolution of the timestamps in ticks per second. :return: ticks per second as an integer """ return 1000000
[docs] def allStreams(self): """ Returns the streams in this file. :return: a list of strings """ return list(self._file["streams"].keys())
[docs] def readSample(self, stream, streamIdx): """ Returns the referenced sample as a tuple (content, dataType, dataTimestamp, rcvTimestamp). :param stream: the stream :param idx: the index of the sample in the stream :return: (content: QByteArray, dataType: str, dataTimestamp: int, receiveTimestamp: int) """ content, dataType, dataTimestamp, receiveTimestamp = self._file["streams"][stream][streamIdx] if isinstance(dataType, bytes): # this is happening now with h5py >= 3.x dataType = dataType.decode() return content.tobytes(), dataType, dataTimestamp, receiveTimestamp
[docs] def getRcvTimestamp(self, stream, streamIdx): """ Returns the recevie timestamp of the given (stream, streamIdx) sample. The default implementation uses readSample(...). It may be replaced by a more efficient implementation. :param stream: the name of the stream as a string :param streamIdx: the stream index as an integer :return: the timestamp as an integer (see also getTimestampResolution) """ return self._file["streams"][stream][streamIdx]["rcvTimestamp"]
[docs] class Hdf5Reader(GenericReader): """ Reader for the nexxT default file format based on hdf5. """
[docs] def getNameFilter(self): """ Returns the name filter associated with the input files. :return: a list of strings, e.g. ["*.h5", "*.hdf5"] """ return ["*.h5", "*.hdf5", "*.hdf"]
[docs] def openFile(self, filename): """ Opens the given file and return an instance of GenericReaderFile. :return: an instance of GenericReaderFile """ return Hdf5File(filename)