# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2020 ifm electronic gmbh
#
# THE PROGRAM IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#
"""
This module defines the filter ImageBlur as a little showcase for a long-running algorithm
"""
import numpy as np
from nexxT.interface import Filter, DataSample
from nexxT.examples.framework.ImageData import byteArrayToNumpy, numpyToByteArray
[docs]
class ImageBlur(Filter):
"""
A filter providing the parameter kernelSize. A box filter of this size is applied to the input image
and the output is transferred over the output port.
"""
[docs]
def __init__(self, env):
super().__init__(False, False, env)
# add the input port for the filter
self.inPort = self.addStaticInputPort("video_in")
# add the output port for the filter
self.outPort = self.addStaticOutputPort("video_out")
# define the kernelSize property
pc = self.propertyCollection()
pc.defineProperty("kernelSize", 3, "Kernel size of a simple blurring kernel", options=dict(min=1, max=99))
[docs]
def onPortDataChanged(self, port):
"""
Overloaded from Filter base class. The method is called whenever new data arrives at an input port.
:param port: the port which has new data.
:return:
"""
if port.getData().getDatatype() == "example/image":
# we don't (want to) have scipy or opencv in the dependency list, so we do that by hand, it is just a
# showcase
ks = self.propertyCollection().getProperty("kernelSize")
# assert odd kernel size
ks = (ks//2)*2 + 1
if ks > 1: # non-trivial size ?
# efficient (zero-copy) conversion
in_img = byteArrayToNumpy(port.getData().getContent())
# apply the filter
res = boxFilter(in_img, ks)
# create a DataSample instance to be transferred over the port
sample = DataSample(numpyToByteArray(res), "example/image", port.getData().getTimestamp())
else:
# filter is no-op, we reuse the input data in this case
sample = port.getData()
# finally transmit the result
self.outPort.transmit(sample)
[docs]
def boxFilter(img, kernelSize):
"""
2D box filter operating on single or multichannel input images
:param img: the image as a numpy array
:param kernel_size: the size of the filter (must be odd)
:return: the filtered image (same type as input image)
"""
kernel = np.ones(kernelSize, np.float32)/kernelSize
kd2 = kernelSize//2
res = np.zeros(img.shape, np.float32)
h, w = res.shape[:2] # pylint: disable=unsubscriptable-object
# filter in y direction
res[kd2:h-kd2, ...] = (img[kd2:h-kd2, ...]*kernel[kd2])
for y in range(1, kd2+1):
res[kd2:h-kd2, ...] += img[kd2-y:h-kd2-y, ...]*kernel[kd2-y] + img[kd2+y:h-kd2+y, ...]*kernel[kd2+y]
# result becomes input now
img = res.astype(img.dtype)
res = np.zeros(img.shape, np.float32)
# filter in x direction
res[:, kd2:w-kd2, ...] = (img[:, kd2:w-kd2, ...]*kernel[kd2])
for x in range(1, kd2+1):
res[:, kd2:w-kd2, ...] += img[:, kd2-x:w-kd2-x, ...]*kernel[kd2-x] + img[:, kd2+x:w-kd2+x, ...]*kernel[kd2+x]
res = res.astype(img.dtype)
return res