#########################################################################################
##
## SOURCE BLOCKS (blocks/sources.py)
##
## This module defines blocks that serve purely as inputs / sources
## for the simulation such as the generic 'Source' block
##
## Milan Rother 2024
##
#########################################################################################
# IMPORTS ===============================================================================
import numpy as np
from ._block import Block
from ..events.schedule import Schedule
from .._constants import TOLERANCE
# GENERIC SOURCE BLOCKS =================================================================
[docs]
class Constant(Block):
"""Produces a constant output signal (SISO)
Parameters
----------
value : float
constant defining block output
"""
def __init__(self, value=1):
super().__init__()
self.value = value
def __len__(self):
"""No algebraic passthrough"""
return 0
[docs]
def update(self, t):
"""update system equation fixed point loop
Parameters
----------
t : float
evaluation time
Returns
-------
error : float
absolute error to previous iteration for convergence
control (always '0.0' because source-type)
"""
self.outputs[0] = self.value
return 0.0
[docs]
class Source(Block):
"""Source that produces an arbitrary time dependent output,
defined by the func (callable).
.. math::
y(t) = \\mathrm{func}(t)
Note
----
This block is purely algebraic and its internal function (`func`) will
be called multiple times per timestep, each time when `Simulation._update(t)`
is called in the global simulation loop.
Example
-------
For example a ramp:
.. code-block:: python
from pathsim.blocks import Source
src = Source(lambda t : t)
or a simple sinusoid with some frequency:
.. code-block:: python
import numpy as np
from pathsim.blocks import Source
#some parameter
omega = 100
#the function that gets evaluated
def f(t):
return np.sin(omega * t)
src = Source(f)
Because the `Source` block only has a single argument, it can be
used to decorate a function and make it a `PathSim` block. This might
be handy in some cases to keep definitions concise and localized
in the code:
.. code-block:: python
import numpy as np
from pathsim.blocks import Source
#does the same as the definition above
@Source
def src(t):
omega = 100
return np.sin(omega * t)
#'src' is now a PathSim block
Parameters
----------
func : callable
function defining time dependent block output
"""
def __init__(self, func=lambda t: 1):
super().__init__()
if not callable(func):
raise ValueError(f"'{func}' is not callable")
self.func = func
def __len__(self):
"""No algebraic passthrough"""
return 0
[docs]
def update(self, t):
"""update system equation fixed point loop
by evaluating the internal function 'func'
Note
----
No direct passthrough, so the `update` method
is optimized and has no convergence check
Parameters
----------
t : float
evaluation time
Returns
-------
error : float
absolute error to previous iteration for convergence
control (always `0.0` because source-type)
"""
self.outputs[0] = self.func(t)
return 0.0
# SPECIAL CONTINUOUS SOURCE BLOCKS ======================================================
[docs]
class TriangleWaveSource(Block):
"""Source block that generates an analog triangle wave
Parameters
----------
frequency : float
frequency of the triangle wave
amplitude : float
amplitude of the triangle wave
phase : float
phase of the triangle wave
"""
def __init__(self, frequency=1, amplitude=1, phase=0):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.phase = phase
def __len__(self):
return 0
def _triangle_wave(self, t, f):
"""triangle wave with amplitude '1' and frequency 'f'
Parameters
----------
t : float
evaluation time
f : float
trig wave frequency
Returns
-------
out : float
trig wave value
"""
return 2 * abs(t*f - np.floor(t*f + 0.5)) - 1
[docs]
def update(self, t):
tau = self.phase/(2*np.pi*self.frequency)
self.outputs[0] = self.amplitude * self._triangle_wave(t + tau, self.frequency)
return 0.0
[docs]
class SinusoidalSource(Block):
"""Source block that generates a sinusoid wave
Parameters
----------
frequency : float
frequency of the sinusoid
amplitude : float
amplitude of the sinusoid
phase : float
phase of the sinusoid
"""
def __init__(self, frequency=1, amplitude=1, phase=0):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.phase = phase
def __len__(self):
return 0
[docs]
def update(self, t):
omega = 2*np.pi*self.frequency
self.outputs[0] = self.amplitude * np.sin(omega*t + self.phase)
return 0.0
[docs]
class GaussianPulseSource(Block):
"""Source block that generates a gaussian pulse
Parameters
----------
amplitude : float
amplitude of the gaussian pulse
f_max : float
maximum frequency component of the gaussian pulse (steepness)
tau : float
time delay of the gaussian pulse
"""
def __init__(self, amplitude=1, f_max=1e3, tau=0.0):
super().__init__()
self.amplitude = amplitude
self.f_max = f_max
self.tau = tau
def __len__(self):
return 0
def _gaussian(self, t, f_max):
"""gaussian pulse with its maximum at t=0
Parameters
----------
t : float
evaluation time
f_max : float
maximum frequency component of gaussian
Returns
-------
out : float
gaussian value
"""
tau = 0.5 / f_max
return np.exp(-(t/tau)**2)
[docs]
def update(self, t):
self.outputs[0] = self.amplitude * self._gaussian(t-self.tau, self.f_max)
return 0.0
[docs]
class SinusoidalPhaseNoiseSource(Block):
"""Sinusoidal source with cumulative and white phase noise
Parameters
----------
frequency : float
frequency of the sinusoid
amplitude : float
amplitude of the sinusoid
phase : float
phase of the sinusoid
sig_cum : float
weight for cumulative phase noise contribution
sig_white : float
weight for white phase noise contribution
sampling_rate : float
number of samples per unit time for the internal RNG
Attributes
----------
omega : float
angular frequency of the sinusoid, derived from `frequency`
noise_1 : float
internal noise value sampled from normal distribution
noise_2 : float
internal noise value sampled from normal distribution
n_samples : int
bin counter for sampling
t_max : float
most recent sampling time, to ensure timing for sampling bins
"""
def __init__(
self,
frequency=1,
amplitude=1,
phase=0,
sig_cum=0,
sig_white=0,
sampling_rate=10
):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.phase = phase
self.sampling_rate = sampling_rate
self.omega = 2 * np.pi * self.frequency
#parameters for phase noise
self.sig_cum = sig_cum
self.sig_white = sig_white
#initial noise sampling
self.noise_1 = np.random.normal()
self.noise_2 = np.random.normal()
#bin counter
self.n_samples = 0
self.t_max = 0
def __len__(self):
return 0
[docs]
def set_solver(self, Solver, **solver_kwargs):
#initialize the numerical integration engine
if self.engine is None: self.engine = Solver(0.0, **solver_kwargs)
#change solver if already initialized
else: self.engine = Solver.cast(self.engine, **solver_kwargs)
[docs]
def reset(self):
super().reset()
#reset block specific attributes
self.n_samples = 0
self.t_max = 0
[docs]
def update(self, t):
"""update system equation for fixed point loop,
here just setting the outputs
Note
----
no direct passthrough, so the 'update' method
is optimized for this case
Parameters
----------
t : float
evaluation time
Returns
-------
error : float
absolute error to previous iteration for convergence
control (here '0.0' because source-type block)
"""
#compute phase error
phase_error = self.sig_white * self.noise_1 + self.sig_cum * self.engine.get()
#set output
self.outputs[0] = self.amplitude * np.sin(self.omega*t + self.phase + phase_error)
return 0.0
[docs]
def sample(self, t):
"""
Sample from a normal distribution after successful timestep.
"""
if (self.sampling_rate is None or
self.n_samples < t * self.sampling_rate):
self.noise_1 = np.random.normal()
self.noise_2 = np.random.normal()
self.n_samples += 1
[docs]
def solve(self, t, dt):
#advance solution of implicit update equation (no jacobian)
f = self.noise_2
self.engine.solve(f, None, dt)
return 0.0
[docs]
def step(self, t, dt):
#compute update step with integration engine
f = self.noise_2
self.engine.step(f, dt)
#no error control for noise source
return True, 0.0, 1.0
[docs]
class ChirpPhaseNoiseSource(Block):
"""Chirp source, sinusoid with frequency ramp up and ramp down.
This works by using a time dependent triangle wave for the frequency
and integrating it with a numerical integration engine to get a
continuous phase. This phase is then used to evaluate a sinusoid.
Additionally the chirp source can have white and cumulative phase noise.
Mathematically it looks like this for the contributions to the phase from
the triangular wave:
.. math::
\\varphi_t(t) = \\int_0^t \\mathrm{tri}_{f_0, B, T}(\\tau) \\, d\\tau
And from the white (w) and cumulative (c) noise:
.. math::
\\varphi_n(t) = \\sigma_w \\, \\mathrm{RNG}_w(t) + \\sigma_c \\int_0^t \\mathrm{RNG}_c(\\tau) \\, d\\tau
The phase contributions are then used to evaluate a sinusoid to get the final chirp signal:
.. math::
y(t) = A \\sin(\\varphi_t(t) + \\varphi_n(t) + \\varphi_0)
Parameters
----------
amplitude : float
amplitude of the chirp signal
f0 : float
start frequency of the chirp signal
BW : float
bandwidth of the frequency ramp of the chirp signal
T : float
period of the frequency ramp of the chirp signal
phase : float
phase of sinusoid (initial)
sig_cum : float
weight for cumulative phase noise contribution
sig_white : float
weight for white phase noise contribution
sampling_rate : float
number of samples per unit time for the internal random number generators
"""
def __init__(
self,
amplitude=1,
f0=1,
BW=1,
T=1,
phase=0,
sig_cum=0,
sig_white=0,
sampling_rate=10
):
super().__init__()
#parameters of chirp signal
self.amplitude = amplitude
self.phase = phase
self.f0 = f0
self.BW = BW
self.T = T
#parameters for phase noise
self.sig_cum = sig_cum
self.sig_white = sig_white
self.sampling_rate = sampling_rate
#initial noise sampling
self.noise_1 = np.random.normal()
self.noise_2 = np.random.normal()
#bin counter
self.n_samples = 0
self.t_max = 0
def __len__(self):
return 0
def _triangle_wave(self, t, f):
"""triangle wave with amplitude '1' and frequency 'f'
Parameters
----------
t : float
evaluation time
f : float
trig wave frequency
Returns
-------
out : float
trig wave value
"""
return 2 * abs(t*f - np.floor(t*f + 0.5)) - 1
[docs]
def reset(self):
super().reset()
#reset
self.n_samples = 0
self.t_max = 0
[docs]
def set_solver(self, Solver, **solver_kwargs):
if self.engine is None:
#initialize the numerical integration engine
self.engine = Solver(self.f0, **solver_kwargs)
else:
#change solver if already initialized
self.engine = Solver.cast(self.engine, **solver_kwargs)
[docs]
def sample(self, t):
"""Sample from a normal distribution after successful timestep
to update internal noise samples
"""
if (self.sampling_rate is None or
self.n_samples < t * self.sampling_rate):
self.noise_1 = np.random.normal()
self.noise_2 = np.random.normal()
self.n_samples += 1
[docs]
def update(self, t):
"""update the block output, assebble phase and evaluate the sinusoid"""
_phase = 2 * np.pi * (self.engine.get() + self.sig_white * self.noise_1) + self.phase
self.outputs[0] = self.amplitude * np.sin(_phase)
return 0.0
[docs]
def solve(self, t, dt):
"""advance implicit solver of implicit integration engine, evaluate
the triangle wave and cumulative noise RNG"""
f = self.BW * (1 + self._triangle_wave(t, 1/self.T))/2 + self.sig_cum * self.noise_2
self.engine.solve(f, None, dt)
#no error for chirp source
return 0.0
[docs]
def step(self, t, dt):
"""compute update step with integration engine, evaluate the triangle wave
and cumulative noise RNG"""
f = self.BW * (1 + self._triangle_wave(t, 1/self.T))/2 + self.sig_cum * self.noise_2
self.engine.step(f, dt)
#no error control for chirp source
return True, 0.0, 1.0
[docs]
class ChirpSource(ChirpPhaseNoiseSource):
def __init__(
self,
amplitude=1,
f0=1,
BW=1,
T=1,
phase=0,
sig_cum=0,
sig_white=0,
sampling_rate=10):
super().__init__(amplitude, f0, BW, T, phase, sig_cum, sig_white, sampling_rate)
import warnings
warnings.warn("'ChirpSource' block will be deprecated and is currently an alias, use 'ChirpPhaseNoiseSource' instead")
# SPECIAL DISCRETE SOURCE BLOCKS ========================================================
[docs]
class PulseSource(Block):
"""Generates a periodic pulse waveform with defined rise and fall times
using a hybrid approach with scheduled events and continuous updates.
Scheduled events trigger phase changes (low, rising, high, falling),
and the `update` method calculates the output value based on the
current phase, performing linear interpolation during rise and fall.
Parameters
----------
amplitude : float, optional
Peak amplitude of the pulse. Default is 1.0.
T : float, optional
Period of the pulse train. Must be positive. Default is 1.0.
t_rise : float, optional
Duration of the rising edge. Default is 0.0.
t_fall : float, optional
Duration of the falling edge. Default is 0.0.
tau : float, optional
Initial delay before the first pulse cycle begins. Default is 0.0.
duty : float, optional
Duty cycle, ratio of the pulse ON duration (plateau time only)
to the total period T (must be between 0 and 1). Default is 0.5.
The high plateau duration is `T * duty`.
Attributes
----------
events : list[Schedule]
Internal scheduled events triggering phase transitions.
_phase : str
Current phase of the pulse ('low', 'rising', 'high', 'falling').
_phase_start_time : float
Simulation time when the current phase began.
"""
def __init__(
self,
amplitude=1.0,
T=1.0,
t_rise=0.0,
t_fall=0.0,
tau=0.0,
duty=0.5
):
super().__init__()
#input validation
if not (T > 0):
raise ValueError("Period T must be positive.")
if not (0 <= t_rise):
raise ValueError("Rise time t_rise cannot be negative.")
if not (0 <= t_fall):
raise ValueError("Fall time t_fall cannot be negative.")
if not (0 <= duty <= 1):
raise ValueError("Duty cycle must be between 0 and 1.")
#ensure rise + high plateau + fall fits within a period
t_plateau = T * duty
if t_rise + t_plateau + t_fall > T:
raise ValueError("Total pulse time (rise+plateau+fall) exceeds period T")
#parameters
self.amplitude = amplitude
self.T = T
self.t_rise = max(TOLERANCE, t_rise)
self.t_fall = max(TOLERANCE, t_fall)
self.tau = tau
self.duty = duty # Duty cycle now refers to the high plateau time
#internal state
self._phase = 'low'
self._phase_start_time = self.tau
#event timings relative to start of cycle (tau)
t_start_rise = self.tau
t_start_high = t_start_rise + self.t_rise
t_start_fall = t_start_high + t_plateau
t_start_low = t_start_fall + self.t_fall
#define event actions (update phase and start time)
def _set_phase_rising(t):
self._phase = 'rising'
self._phase_start_time = t
self.outputs[0] = 0.0
def _set_phase_high(t):
self._phase = 'high'
self._phase_start_time = t
self.outputs[0] = self.amplitude
def _set_phase_falling(t):
self._phase = 'falling'
self._phase_start_time = t
self.outputs[0] = self.amplitude
def _set_phase_low(t):
self._phase = 'low'
self._phase_start_time = t
self.outputs[0] = 0.0
#start rising
_E_rising = Schedule(
t_start=max(0.0, t_start_rise),
t_period=self.T,
func_act=_set_phase_rising
)
#start high plateau (end rising)
_E_high = Schedule(
t_start=max(0.0, t_start_high),
t_period=self.T,
func_act=_set_phase_high
)
#start falling
_E_falling = Schedule(
t_start=max(0.0, t_start_fall),
t_period=self.T,
func_act=_set_phase_falling
)
#start low (end falling)
_E_low = Schedule(
t_start=max(0.0, t_start_low),
t_period=self.T,
func_act=_set_phase_low
)
#scheduled events for state transitions
self.events = [_E_rising, _E_high, _E_falling, _E_low]
[docs]
def reset(self):
"""Resets the block state."""
super().reset()
self._phase = 'low'
self._phase_start_time = self.tau
[docs]
def update(self, t):
"""Calculate the pulse output value based on the current phase.
Performs linear interpolation during 'rising' and 'falling' phases.
Parameters
----------
t : float
evaluation time
Returns
-------
error : float
deviation to previous iteration for convergence control
(always '0.0', because no passthrough)
"""
#calculate output based on phase
if self._phase == 'rising':
_val = self.amplitude * (t - self._phase_start_time) / self.t_rise
self.outputs[0] = np.clip(_val, 0.0, self.amplitude)
elif self._phase == 'high':
self.outputs[0] = self.amplitude
elif self._phase == 'falling':
_val = self.amplitude * (1.0 - (t - self._phase_start_time) / self.t_fall)
self.outputs[0] = np.clip(_val, 0.0, self.amplitude)
elif self._phase == 'low':
self.outputs[0] = 0.0
return 0.0
def __len__(self):
#no algebraic passthrough
return 0
[docs]
class Pulse(PulseSource):
def __init__(self, amplitude=1.0, T=1.0, t_rise=0.0, t_fall=0.0, tau=0.0, duty=0.5):
super().__init__(amplitude, T, t_rise, t_fall, tau, duty)
import warnings
warnings.warn("'Pulse' block will be deprecated and is currently an alias, use 'PulseSource' instead")
[docs]
class ClockSource(Block):
"""Discrete time clock source block.
Utilizes scheduled events to periodically set
the block output to 0 or 1 at discrete times.
Parameters
----------
T : float
period of the clock
tau : float
clock delay
Attributes
----------
events : list[Schedule]
internal scheduled event list
"""
def __init__(self, T=1, tau=0):
super().__init__()
self.T = T
self.tau = tau
def clk_up(t):
self.outputs[0] = 1
def clk_down(t):
self.outputs[0] = 0
#internal scheduled events
self.events = [
Schedule(
t_start=tau,
t_period=T,
func_act=clk_up
),
Schedule(
t_start=tau+T/2,
t_period=T,
func_act=clk_down
)
]
def __len__(self):
#no algebraic passthrough
return 0
[docs]
class Clock(ClockSource):
def __init__(self, T=1, tau=0):
super().__init__(T, tau)
import warnings
warnings.warn("'Clock' block will be deprecated and is currently an alias, use 'ClockSource' instead")
[docs]
class SquareWaveSource(Block):
"""Discrete time square wave source.
Utilizes scheduled events to periodically set
the block output at discrete times.
Parameters
----------
amplitude : float
amplitude of the square wave signal
frequency : float
frequency of the square wave signal
phase : float
phase of the square wave signal
Attributes
----------
events : list[Schedule]
internal scheduled events
"""
def __init__(self, amplitude=1, frequency=1, phase=0):
super().__init__()
self.amplitude = amplitude
self.frequency = frequency
self.phase = phase
def sqw_up(t):
self.outputs[0] = self.amplitude
def sqw_down(t):
self.outputs[0] = -self.amplitude
#internal scheduled events
self.events = [
Schedule(
t_start=1/frequency * phase/360,
t_period=1/frequency,
func_act=sqw_up
),
Schedule(
t_start=1/frequency * (phase/360 + 0.5),
t_period=1/frequency,
func_act=sqw_down
)
]
def __len__(self):
#no algebraic passthrough
return 0
[docs]
class StepSource(Block):
"""Discrete time unit step block.
Utilizes a scheduled event to set
the block output at the defined delay.
Parameters
----------
amplitude : float
amplitude of the step signal
tau : float
delay of the step
Attributes
----------
events : list[Schedule]
internal scheduled event
"""
def __init__(self, amplitude=1, tau=0.0):
super().__init__()
self.amplitude = amplitude
self.tau = tau
def stp_up(t):
self.outputs[0] = self.amplitude
#internal scheduled event
self.events = [
Schedule(
t_start=tau,
t_period=tau,
t_end=3*tau/2,
func_act=stp_up
)
]
def __len__(self):
#no algebraic passthrough
return 0
[docs]
class Step(StepSource):
def __init__(self, amplitude=1, tau=0.0):
super().__init__(amplitude, tau)
import warnings
warnings.warn("'Step' block will be deprecated and is currently an alias, use 'StepSource' instead")