#########################################################################################
##
## SUBSYSTEM DEFINITION
## (subsystem.py)
##
## This module contains the 'Subsystem' and 'Interface' classes
## that manage subsystems that can be embedded within a larger simulation
##
#########################################################################################
# IMPORTS ===============================================================================
import numpy as np
from collections import defaultdict
from .connection import Connection
from .blocks._block import Block
from .optim.booster import ConnectionBooster
from .utils.graph import Graph
from .utils.register import Register
from .utils.portreference import PortReference
from ._constants import (
SIM_ITERATIONS_MAX,
SIM_TOLERANCE_FPI,
LOG_ENABLE
)
# IO CLASS ==============================================================================
[docs]
class Interface(Block):
"""Bare-bone block that serves as a data interface for the 'Subsystem' class.
It works like this:
- Internal blocks of the subsystem are connected to the inputs and outputs
of this Interface block via the internal connections.
- It behaves like a normal block (inherits the main 'Block' class methods).
- It implements some special methods to get and set the inputs and outputs
of the blocks, that are used to translate between the internal blocks of the
subsystem and the inputs and outputs of the subsystem.
- Handles data transfer to and from the internal subsystem blocks
to and from the inputs and outputs of the subsystem.
"""
def __len__(self):
return 0
[docs]
def register_port_map(self, port_map_in, port_map_out):
"""Update the input and output registers of the interface block with port mappings
Parameters
----------
port_map_in : dict[str: int]
port alias mapping for block inputs
port_map_out : dict[str: int]
port alias mapping for block outputs
"""
self._port_map_in = port_map_in
self._port_map_out = port_map_out
#reinitialize to build registers with mappings
self.__init__()
# MAIN SUBSYSTEM CLASS ==================================================================
[docs]
class Subsystem(Block):
"""Subsystem class that holds its own blocks and connecions and
can natively interface with the main simulation loop.
IO interface is realized by a special 'Interface' block, that has extra
methods for setting and getting inputs and outputs and serves
as the interface of the internal blocks to the outside.
The subsystem doesnt use its 'inputs' and 'outputs' dicts directly.
It exclusively handles data transfer via the 'Interface' block.
This class can be used just like any other block during the simulation,
since it implements the required methods 'update' for the fixed-point
iteration (resolving algebraic loops with instant time blocks),
the 'step' method that performs timestepping (especially for dynamic
blocks with internal states) and the 'solve' method for solving the
implicit update equation for implicit solvers.
Example
-------
This is how we can wrap up multiple blocks within a subsystem.
In this case vanderpol system built from discrete components
instead of using an ODE block (in practice you should use
a monolithic ODE whenever possible due to performance).
.. code-block:: python
from pathsim import Subsystem, Interface, Connection
from pathsim.blocks import Integrator, Function
#van der Pol parameter
mu = 1000
#blocks in the subsystem
If = Interface() # this is the interface to the outside
I1 = Integrator(2)
I2 = Integrator(0)
Fn = Function(lambda x1, x2: mu*(1 - x1**2)*x2 - x1)
sub_blocks = [If, I1, I2, Fn]
#connections in the subsystem
sub_connections = [
Connection(I2, I1, Fn[1], If[1]),
Connection(I1, Fn, If),
Connection(Fn, I2)
]
#the subsystem acts just like a normal block
vdp = Subsystem(sub_blocks, sub_connections)
Parameters
----------
blocks : list[Block]
internal blocks of the subsystem
connections : list[Connection]
internal connections of the subsystem
tolerance_fpi : float
absolute tolerance for convergence of algebraic loops
default see ´SIM_TOLERANCE_FPI´ in ´_constants.py´
iterations_max : int
maximum allowed number of iterations for algebraic loop
solver, default see ´SIM_ITERATIONS_MAX´ in ´_constants.py´
Attributes
----------
interface : Interface
internal interface block for data transfer to the outside
graph : Graph
internal graph representation for fast system funcion
evluations using DAG with algebraic depths
boosters : None | list[ConnectionBooster]
list of boosters (fixed point accelerators) that wrap
algebraic loop closing connections assembled from the
system graph
"""
def __init__(self,
blocks=None,
connections=None,
tolerance_fpi=SIM_TOLERANCE_FPI,
iterations_max=SIM_ITERATIONS_MAX
):
#internal integration engine -> initialized later
self.engine = None
#flag to set block (subsystem) active
self._active = True
#error tolerance for alg. loop solver
self.tolerance_fpi = tolerance_fpi
#max iterations for internal alg. loop solver
self.iterations_max = iterations_max
#operators for algebraic and dynamic components (not here)
self.op_alg = None
self.op_dyn = None
#internal graph representation -> initialized later
self.graph = None
#internal algebraic loop solvers -> initialized later
self.boosters = None
#internal connecions
self.connections = set()
if connections:
self.connections.update(connections)
#collect and organize internal blocks
self.blocks = set()
self.interface = None
if blocks:
for block in blocks:
if isinstance(block, Interface):
if self.interface is not None:
#interface block is already defined
raise ValueError("Subsystem can only have one 'Interface' block!")
self.interface = block
else:
#regular blocks
self.blocks.add(block)
#check if interface is defined
if self.interface is None:
raise ValueError("Subsystem 'blocks' list needs to contain 'Interface' block!")
#update port mappings of interface (reverse)
self.interface.register_port_map(self._port_map_out, self._port_map_in)
#assemble internal graph
self._assemble_graph()
def __len__(self):
"""Check if the Subsystem has algebraic passthrough by quering
the graph for an algebraic path from the interface to itself.
"""
is_alg = self.graph.is_algebraic_path(self.interface, self.interface)
return int(is_alg)
def __call__(self):
"""Recursively get the subsystems internal states of engines
(if available) of all internal blocks and nested subsystems
and the subsystem inputs and outputs as arrays for use outside.
Either for monitoring, postprocessing or event detection.
In any case this enables easy access to the current block state.
"""
_inputs = self.interface.outputs.to_array()
_outputs = self.interface.inputs.to_array()
_states = []
for block in self.blocks:
_i, _o, _s = block()
_states.append(_s)
return _inputs, _outputs, np.hstack(_states)
def __contains__(self, other):
"""Check if blocks and connections are already part of the subsystem
Paramters
---------
other : obj
object to check if its part of subsystem
Returns
-------
bool
"""
return other in self.blocks or other in self.connections
# subsystem graph assembly --------------------------------------------------------------
def _assemble_graph(self):
"""Assemble internal graph of subsystem for fast
algebraic evaluation during simulation.
"""
self.graph = Graph({*self.blocks, self.interface}, self.connections)
#create boosters for loop closing connections
if self.graph.has_loops:
self.boosters = [
ConnectionBooster(conn) for conn in self.graph.loop_closing_connections()
]
# methods for access to metadata --------------------------------------------------------
@property
def size(self):
"""Get size information from subsystem, recursively assembled
from internal blocks, including nested subsystems.
Returns
-------
sizes : tuple[int]
size of block (number of all internal blocks)
and number of internal states (from internal engines)
"""
total_n, total_nx = 0, 0
for block in self.blocks:
n, nx = block.size
total_n += n
total_nx += nx
return total_n, total_nx
# visualization -------------------------------------------------------------------------
[docs]
def plot(self, *args, **kwargs):
"""Plot the simulation results by calling all the blocks
that have visualization capabilities such as the 'Scope'
and 'Spectrum'.
Parameters
----------
args : tuple
args for the plot methods
kwargs : dict
kwargs for the plot method
"""
for block in self.blocks:
block.plot(*args, **kwargs)
# system management ---------------------------------------------------------------------
[docs]
def reset(self):
"""Reset the subsystem interface and all internal blocks"""
#reset interface
self.interface.reset()
#reset internal blocks
for block in self.blocks:
block.reset()
[docs]
def on(self):
"""Activate the subsystem and all internal blocks, sets the boolean
evaluation flag to 'True'.
"""
self._active = True
for block in self.blocks:
block.on()
[docs]
def off(self):
"""Deactivate the subsystem and all internal blocks, sets the boolean
evaluation flag to 'False'. Also resets the subsystem.
"""
self._active = False
for block in self.blocks:
block.off()
self.reset()
[docs]
def linearize(self, t):
"""Linearize the algebraic and dynamic components of the internal blocks.
This is done by linearizing the internal 'Operator' and 'DynamicOperator'
instances of all the internal blocks of the subsystem in the current system
operating point. The operators create 1st order tayler approximations
internally and use them on subsequent calls after linarization.
Recursively traverses down the hierarchy for nested subsystems and linearizes
all of them.
Parameters
----------
t : float
evaluation time
"""
for block in self.blocks:
block.linearize(t)
[docs]
def delinearize(self):
"""Revert the linearization of the internal blocks."""
for block in self.blocks:
block.delinearize()
# serialization / deserialization -------------------------------------------------------
[docs]
def to_dict(self):
"""Custom serialization for Subsystem"""
data = super().to_dict()
#serialization for internal blocks and interface
data["params"]["blocks"] = [block.to_dict() for block in {*self.blocks, self.interface}]
#serialize connections
data["params"]["connections"] = [conn.to_dict() for conn in self.connections]
return data
[docs]
@classmethod
def from_dict(cls, data):
"""Custom deserialization for Subsystem"""
from .connection import Connection
#deserialize blocks and create block ID mapping
blocks, id_to_block = [], {}
for blk_data in data["params"].pop("blocks", []):
block = Block.from_dict(blk_data)
blocks.append(block)
id_to_block[blk_data["id"]] = block
#deserialize connections
connections = []
for conn_data in data["params"].pop("connections", []):
#source data
source_block = id_to_block[conn_data["source"]["block"]]
source_ports = conn_data["source"]["ports"]
source = PortReference(source_block, source_ports)
#target data
targets = []
for trg in conn_data["targets"]:
target_block = id_to_block[trg["block"]]
target_ports = trg["ports"]
targets.append(
PortReference(target_block, target_ports)
)
#create the connection
connections.append(
Connection(source, *targets)
)
#finally construct the subsystem
return cls(blocks, connections)
# methods for discrete event management -------------------------------------------------
@property
def events(self):
"""Recursively collect and return events spawned by the
internal blocks of the subsystem, for discrete time
blocks such as triggers / comparators, clocks, etc.
"""
_events = []
for block in self.blocks:
_events.extend(block.events)
return _events
# methods for inter-block data transfer -------------------------------------------------
@property
def inputs(self):
return self.interface.outputs
@property
def outputs(self):
return self.interface.inputs
# methods for data recording ------------------------------------------------------------
[docs]
def sample(self, t, dt):
"""Update the internal connections again and sample data from
the internal blocks that implement the 'sample' method.
Parameters
----------
t : float
evaluation time
dt : float
integration timestep
"""
#record data if required
for block in self.blocks:
block.sample(t, dt)
# methods for block output and state updates --------------------------------------------
[docs]
def update(self, t):
"""Update the instant time components of the internal blocks
to evaluate the (distributed) system equation.
Parameters
----------
t : float
evaluation time
"""
#evaluate DAG
self._dag(t)
#algebraic loops -> solve them
if self.graph.has_loops:
self._loops(t)
def _dag(self, t):
"""Update the directed acyclic graph components of the system.
Parameters
----------
t : float
evaluation time for system function
"""
#update interface outgoing connections
for connection in self.graph.outgoing_connections(self.interface):
if connection: connection.update()
#perform gauss-seidel iterations without error checking
for _, blocks_dag, connections_dag in self.graph.dag():
#update blocks at algebraic depth
for block in blocks_dag:
if block: block.update(t)
#update connenctions at algebraic depth (data transfer)
for connection in connections_dag:
if connection: connection.update()
def _loops(self, t):
"""Perform the algebraic loop solve of the subsystem using accelerated
fixed-point iterations on the broken loop directed graph.
Parameters
----------
t : float
evaluation time for system function
"""
#reset accelerators of loop closing connections
for con_booster in self.boosters:
con_booster.reset()
#perform solver iterations on algebraic loops
for iteration in range(1, self.iterations_max):
#iterate DAG depths of broken loops
for depth, blocks_loop, connections_loop in self.graph.loop():
#update blocks at algebraic depth
for block in blocks_loop:
if block: block.update(t)
#step accelerated connenctions at algebraic depth (data transfer)
for connection in connections_loop:
if connection: connection.update()
#step boosters of loop closing connections
max_err = 0.0
for con_booster in self.boosters:
err = con_booster.update()
if err > max_err:
max_err = err
#check convergence after first iteration
if max_err <= self.tolerance_fpi:
return
#not converged -> error
raise RuntimeError(
"algebraic loop in 'Subsystem' not converged (iters: {}, err: {})".format(
self.iterations_max, max_err)
)
# methods for blocks with integration engines -------------------------------------------
[docs]
def solve(self, t, dt):
"""Advance solution of implicit update equation
for internal blocks.
Parameters
----------
t : float
evaluation time
dt : float
timestep
Returns
-------
max_error : float
maximum error of implicit update equaiton
"""
max_error = 0.0
for block in self._blocks_dyn:
if not block: continue
err = block.solve(t, dt)
if err > max_error:
max_error = err
return max_error
[docs]
def step(self, t, dt):
"""Explicit component of timestep for internal blocks
including error propagation.
Notes
-----
This is pretty much an exact copy of the '_step' method
from the 'Simulation' class.
Parameters
----------
t : float
evaluation time
dt : float
timestep
Returns
-------
success : bool
indicator if the timestep was successful
max_error : float
maximum local truncation error from integration
scale : float
rescale factor for timestep
"""
#initial timestep rescale and error estimate
success, max_error_norm, relevant_scales = True, 0.0, []
#step blocks and get error estimates if available
for block in self._blocks_dyn:
#skip inactive internal blocks
if not block: continue
suc, err_norm, scl = block.step(t, dt)
#check solver stepping success
if not suc:
success = False
#update error tracking
if err_norm > max_error_norm:
max_error_norm = err_norm
#update timestep rescale if relevant
if scl != 1.0 and scl > 0.0:
relevant_scales.append(scl)
#no relevant timestep rescale -> quit early
if not relevant_scales:
return success, max_error_norm, 1.0
#compute real timestep rescale
return success, max_error_norm, min(relevant_scales)
[docs]
def set_solver(self, Solver, parent, **solver_args):
"""Initialize all blocks with solver for numerical integration
and additional args for the solver such as tolerances, etc.
If blocks already have solvers, change the numerical integrator
to the 'Solver' class.
Parameters
----------
Solver : Solver
numerical solver definition
parent : Solver
numerical solver instance as parent
solver_args : dict
args to initialize solver with
"""
#set internal dummy engine -> marks block as dynamic
self.engine = Solver(parent=parent, **solver_args)
#set integration engines and assemble list of dynamic blocks
self._blocks_dyn = []
for block in self.blocks:
block.set_solver(Solver, parent, **solver_args)
if block.engine:
self._blocks_dyn.append(block)
[docs]
def revert(self):
"""revert the internal blocks to the state
of the previous timestep
"""
for block in self._blocks_dyn:
if block: block.revert()
[docs]
def buffer(self, dt):
"""buffer internal states of blocks with
internal integration engines
Parameters
----------
dt : float
evaluation time for buffering
"""
for block in self._blocks_dyn:
if block: block.buffer(dt)