Source code for pathsim.blocks.fir

#########################################################################################
##
##               DISCRETE-TIME FINITE-IMPULSE-RESPONSE (FIR) FILTER BLOCK
##                                  (blocks/fir.py) 
##
##                                 Milan Rother 2025
##
#########################################################################################

# IMPORTS ===============================================================================

import numpy as np
from collections import deque

from ._block import Block    
from ..events.schedule import Schedule 


# FIR FILTER BLOCK ======================================================================

[docs] class FIR(Block): """Models a discrete-time Finite-Impulse-Response (FIR) filter. This block applies an FIR filter to an input signal sampled periodically. The output at each sample time is a weighted sum of the current and a finite number of past input samples. The operation is triggered by a scheduled event. Functionality: .. math:: y[n] = b[0] x[n] + b[1] x[n-1] + \\dots + b[N] x[n-N] where `b` are the filter coefficients and `N` is the filter order (number of coefficients - 1). 1. Samples the input `inputs[0]` at intervals of `T`, starting after delay `tau`. 2. Stores the current and past `len(coefficients) - 1` input samples in an internal buffer. 3. Computes the filter output using the dot product of the coefficients and the buffered input samples. 4. Outputs the result on `outputs[0]`. 5. Holds the output constant between updates. Parameters ---------- coeffs : array_like List or numpy array of FIR filter coefficients [b0, b1, ..., bN]. The number of coefficients determines the filter's order and memory. T : float, optional Sampling period (time between input samples and output updates). Default is 1. tau : float, optional Initial delay before the first sample is processed. Default is 0. Input Ports ----------- inputs[0] : float Input signal sample at the current time step. Output Ports ------------ outputs[0] : float Filtered output signal sample. Attributes ---------- buffer : deque Internal buffer storing the most recent input samples. events : list[Schedule] Internal scheduled event triggering the filter calculation. """ #max number of ports _n_in_max = 1 _n_out_max = 1 #maps for input and output port labels _port_map_in = {"in": 0} _port_map_out = {"out": 0} def __init__(self, coeffs=[1.0], T=1, tau=0): super().__init__() self.coeffs = np.array(coeffs) self.T = T self.tau = tau #buffer to store the last N+1 input samples (current + N past) n = len(self.coeffs) self._buffer = deque([0.0]*n, maxlen=n) def _update_fir(t): #update internal buffer self._buffer.appendleft(self.inputs[0]) #compute the FIR output: y[n] = sum(b[k] * x[n-k]) current_output = np.dot(self.coeffs, self._buffer) #update the block's output port self.outputs[0] = current_output #internal scheduled event self.events = [ Schedule( t_start=self.tau, t_period=self.T, func_act=_update_fir ) ]
[docs] def reset(self): """Resets the filter state (buffer) and output.""" super().reset() n = len(self.coeffs) self._buffer = deque([0.0]*n, maxlen=n)
def __len__(self): """This block has no direct passthrough""" return 0