Source code for pathsim.blocks.spectrum

#########################################################################################
##
##                   SPECTRUM ANALYZER BLOCK (blocks/spectrum.py)
##
##                                Milan Rother 2024
##
#########################################################################################

# IMPORTS ===============================================================================

import csv

import warnings

import numpy as np
import matplotlib.pyplot as plt

from ._block import Block

from ..utils.realtimeplotter import RealtimePlotter

from .._constants import COLORS_ALL


# BLOCKS FOR DATA RECORDING =============================================================

[docs] class Spectrum(Block): """Block for fourier spectrum analysis (basically a spectrum analyzer), computes continuous time running fourier transform (RFT) of the incoming signal. A time threshold can be set by 't_wait' to start recording data only after the simulation time is larger then the specified waiting time, i.e. 't - t_wait > dt'. This is useful for recording the steady state after all the transients have settled. An exponential forgetting factor 'alpha' can be specified for realtime spectral analysis. It biases the spectral components exponentially to the most recent signal values by applying a single sided exponential window like this: .. math:: \\int_0^t u(\\tau) \\exp(\\alpha (t-\\tau)) \\exp(-j \\omega \\tau)\\ d \\tau It is also known as the 'exponentially forgetting transform' (EFT) and a form of short time fourier transform (STFT). It is implemented as a 1st order statespace model .. math:: \\dot{x} = - \\alpha x + \\exp(-j \\omega t) u where 'u' is the input signal and 'x' is the state variable that represents the complex fourier coefficient to the frequency 'omega'. The ODE is integrated using the numerical integration engine of the block. Example ------- This is how to initialize it: .. code-block:: python import numpy as np #linear frequencies (0Hz, DC -> 1kHz) sp1 = Spectrum( freq=np.linspace(0, 1e3, 100), labels=['x1', 'x2'] #labels for two inputs ) #log frequencies (1Hz -> 1kHz) sp2 = Spectrum( freq=np.logspace(0, 3, 100) ) #log frequencies including DC (0Hz, DC + 1Hz -> 1kHz) sp3 = Spectrum( freq=np.hstack([0.0, np.logspace(0, 3, 100)]) ) #arbitrary frequencies sp4 = Spectrum( freq=np.array([0, 0.5, 20, 1e3]) ) Note ---- This block is relatively slow! But it is valuable for long running simulations with few evaluation frequencies, where just FFT'ing the time series data wouldnt be efficient OR if only the evaluation at weirdly spaced frequencies is required. Otherwise its more efficient to just do an FFT on the time series recording after the simulation has finished. Parameters ---------- freq : array[float] list of evaluation frequencies for RFT, can be arbitrarily spaced t_wait : float wait time before starting RFT alpha : float exponential forgetting factor for realtime spectrum labels : list[str] labels for the inputs """ #max number of ports _n_in_max = None _n_out_max = 0 def __init__(self, freq=[], t_wait=0.0, alpha=0.0, labels=[]): super().__init__() #time delay until start recording self.t_wait = t_wait #last valid timestep sample for waiting self.t_sample = 0.0 #local integration time self.time = 0.0 #forgetting factor self.alpha = alpha #labels for plotting and saving data self.labels = labels #frequency self.freq = np.array(freq) self.omega = 2.0 * np.pi * self.freq def __len__(self): return 0 def _kernel(self, x, u, t): """Helper method that defines the kernel for the internal running fourier transform (RFT) """ if self.alpha == 0: return np.kron(u, np.exp(-1j * self.omega * t)) return np.kron(u, np.exp(-1j * self.omega * t)) - self.alpha * x
[docs] def set_solver(self, Solver, parent, **solver_kwargs): """set the internal numerical integrator for the RFT Parameters ---------- Solver : Solver numerical integration solver class solver_kwargs : dict parameters for solver initialization """ if self.engine is None: self.engine = Solver(0.0, parent, **solver_kwargs) else: self.engine = Solver.cast(self.engine, parent, **solver_kwargs)
[docs] def reset(self): super().reset() #local integration time self.time = 0.0
[docs] def read(self): """Read the recorded spectrum Example ------- This is how to get the recorded spectrum: .. code-block:: python import numpy as np #linear frequencies (0Hz, DC -> 1kHz) spc = Spectrum( freq=np.linspace(0, 1e3, 100), labels=['x1', 'x2'] #labels for two inputs ) #... run the simulation ... #read the complex spectra of x1 and x2 freq, [X1, X2] = spc.read() Returns ------- freq : array[float] evaluation frequencies spec : array[complex] complex spectrum """ #just return zeros if no engine initialized if self.engine is None: return self.freq, [np.zeros_like(self.freq)]*len(self.inputs) #catch case where time is still zero if self.time == 0.0: return self.freq, [np.zeros_like(self.freq)]*len(self.inputs) #get state from engine state = self.engine.get() #catch case where state has not been updated if np.all(state == self.engine.initial_value): return self.freq, [np.zeros_like(self.freq)]*len(self.inputs) #reshape state into spectra spec = np.reshape(state, (-1, len(self.freq))) #rescale spectrum and return it if self.alpha != 0.0: return self.freq, spec * self.alpha / (1.0 - np.exp(-self.alpha*self.time)) #return spectrum from RFT return self.freq, spec/self.time
[docs] def solve(self, t, dt): """advance solution of implicit update equation of the solver Parameters ---------- t : float evaluation time dt : float integration timestep Returns ------- error : float solver residual norm """ if self.t_sample >= self.t_wait: #effective time for integration self.time = t - self.t_wait #advance solution of implicit update equation (no jacobian) f = self._kernel(self.engine.get(), self.inputs.to_array(), self.time) return self.engine.solve(f, None, dt) #no error return 0.0
[docs] def step(self, t, dt): """compute timestep update with integration engine Parameters ---------- t : float evaluation time dt : float integration timestep Returns ------- success : bool step was successful error : float local truncation error from adaptive integrators scale : float timestep rescale from adaptive integrators """ if self.t_sample >= self.t_wait: #effective time for integration self.time = t - self.t_wait #compute update step with integration engine f = self._kernel(self.engine.get(), self.inputs.to_array(), self.time) return self.engine.step(f, dt) #no error estimate return True, 0.0, 1.0
[docs] def sample(self, t, dt): """sample time of successfull timestep for waiting period Parameters ---------- t : float sampling time dt : float integration timestep """ self.t_sample = t
[docs] def plot(self, *args, **kwargs): """Directly create a plot of the recorded data for visualization. The 'fig' and 'ax' objects are accessible as attributes of the 'Spectrum' instance from the outside for saving, or modification, etc. Parameters ---------- args : tuple args for ax.plot kwargs : dict kwargs for ax.plot Returns ------- fig : matplotlib.figure internal figure instance ax : matplotlib.axis internal axis instance """ #just return 'None' if no engine initialized if self.engine is None: return None #get data freq, data = self.read() #initialize figure self.fig, self.ax = plt.subplots(nrows=1, ncols=1, figsize=(8,4), tight_layout=True, dpi=120) #custom colors self.ax.set_prop_cycle(color=COLORS_ALL) #plot magnitude in dB and add label for p, d in enumerate(data): lb = self.labels[p] if p < len(self.labels) else f"port {p}" self.ax.plot(freq, abs(d), *args, **kwargs, label=lb) #legend labels from ports self.ax.legend(fancybox=False) #other plot settings self.ax.set_xlabel("freq [Hz]") self.ax.set_ylabel("magnitude") self.ax.grid() # Legend picking functionality lines = self.ax.get_lines() # Get the lines from the plot leg = self.ax.get_legend() # Get the legend # Map legend lines to original plot lines lined = dict() for legline, origline in zip(leg.get_lines(), lines): # Enable picking within 5 points tolerance legline.set_picker(5) lined[legline] = origline def on_pick(event): legline = event.artist origline = lined[legline] visible = not origline.get_visible() origline.set_visible(visible) legline.set_alpha(1.0 if visible else 0.2) # Redraw the figure self.fig.canvas.draw() #enable picking self.fig.canvas.mpl_connect("pick_event", on_pick) #show the plot without blocking following code plt.show(block=False) #return figure and axis for outside manipulation return self.fig, self.ax
[docs] def save(self, path="spectrum.csv"): """Save the recording of the spectrum to a csv file Parameters ---------- path : str path where to save the recording as a csv file """ #check path ending if not path.lower().endswith(".csv"): path += ".csv" #get data freq, data = self.read() #number of ports and labels P, L = len(data), len(self.labels) #construct port labels port_labels = [self.labels[p] if p < L else f"port {p}" for p in range(P)] #make csv header header = ["freq [Hz]"] for l in port_labels: header.extend([f"Re({l})", f"Im({l})"]) #write to csv file with open(path, "w", newline="") as file: wrt = csv.writer(file) #write the header to csv file wrt.writerow(header) #write each sample to the csv file for f, *dta in zip(freq, *data): sample = [f] for d in dta: sample.extend([np.real(d), np.imag(d)]) wrt.writerow(sample)
[docs] def update(self, t): """update system equation for fixed point loop, here just setting the outputs Note ---- Spectrum block has no passthrough, so the 'update' method is optimized for this case Parameters ---------- t : float evaluation time """ pass
[docs] class RealtimeSpectrum(Spectrum): """An extension of the 'Spectrum' block that also initializes a realtime plotter that creates an interactive plotting window while the simulation is running. Otherwise implements the same functionality as the regular 'Spectrum' block. Note ---- Due to the plotting being relatively expensive, including this block slows down the simulation significantly but may still be valuable for debugging and testing. Parameters ---------- freq : array[float] list of evaluation frequencies for RFT, can be arbitrarily spaced t_wait : float wait time before starting RFT alpha : float exponential forgetting factor for realtime spectrum labels : list[str] labels for the inputs """ def __init__(self, freq=[], t_wait=0.0, alpha=0.0, labels=[]): super().__init__(freq, t_wait, alpha, labels) #initialize realtime plotter self.plotter = RealtimePlotter( update_interval=0.1, labels=labels, x_label="freq [Hz]", y_label="magnitude" ) warnings.warn("'RealtimeSpectrum' block will be deprecated with release version 1.0.0")
[docs] def step(self, t, dt): """compute timestep update with integration engine Parameters ---------- t : float evaluation time dt : float integration timestep Returns ------- success : bool step was successful error : float local truncation error from adaptive integrators scale : float timestep rescale from adaptive integrators """ #effective time for integration _t = t - self.t_wait if _t > dt: #update local integtration time self.time = _t if self.time > 2*dt: #update realtime plotter _, data = self.read() self.plotter.update_all(self.freq, abs(data)) #compute update step with integration engine f = self._kernel(self.engine.get(), self.inputs.to_array(), _t) return self.engine.step(f, dt) #no error estimate return True, 0.0, 1.0