Source code for pathsim.subsystem

#########################################################################################
##
##                                 SUBSYSTEM DEFINITION 
##                                    (subsystem.py)
##
##              This module contains the 'Subsystem' and 'Interface' classes 
##         that manage subsystems that can be embedded within a larger simulation
##
##                                  Milan Rother 2024
##
#########################################################################################

# IMPORTS ===============================================================================

import numpy as np

from .connection import Connection

from .blocks._block import Block

from .utils.utils import path_length_dfs
from .utils.register import Register
from .utils.portreference import PortReference


# IO CLASS ==============================================================================

[docs] class Interface(Block): """Bare-bone block that serves as a data interface for the 'Subsystem' class. It works like this: - Internal blocks of the subsystem are connected to the inputs and outputs of this Interface block via the internal connections. - It behaves like a normal block (inherits the main 'Block' class methods). - It implements some special methods to get and set the inputs and outputs of the blocks, that are used to translate between the internal blocks of the subsystem and the inputs and outputs of the subsystem. - Handles data transfer to and from the internal subsystem blocks to and from the inputs and outputs of the subsystem. """
[docs] def set_output(self, port, value): self.outputs[port] = value
[docs] def get_input(self, port): return self.inputs[port]
# MAIN SUBSYSTEM CLASS ==================================================================
[docs] class Subsystem(Block): """Subsystem class that holds its own blocks and connecions and can natively interface with the main simulation loop. IO interface is realized by a special 'Interface' block, that has extra methods for setting and getting inputs and outputs and serves as the interface of the internal blocks to the outside. The subsystem doesnt use its 'inputs' and 'outputs' dicts directly. It exclusively handles data transfer via the 'Interface' block. This class can be used just like any other block during the simulation, since it implements the required methods 'update' for the fixed-point iteration (resolving algebraic loops with instant time blocks), the 'step' method that performs timestepping (especially for dynamic blocks with internal states) and the 'solve' method for solving the implicit update equation for implicit solvers. Example ------- This is how we can wrap up multiple blocks within a subsystem. In this case vanderpol system built from discrete components instead of using an ODE block (in practice you should use a monolithic ODE whenever possible due to performance). .. code-block:: python from pathsim import Subsystem, Interface, Connection from pathsim.blocks import Integrator, Function #van der Pol parameter mu = 1000 #blocks in the subsystem If = Interface() # this is the interface to the outside I1 = Integrator(2) I2 = Integrator(0) Fn = Function(lambda x1, x2: mu*(1 - x1**2)*x2 - x1) sub_blocks = [If, I1, I2, Fn] #connections in the subsystem sub_connections = [ Connection(I2, I1, Fn[1], If[1]), Connection(I1, Fn, If), Connection(Fn, I2) ] #the subsystem acts just like a normal block vdp = Subsystem(sub_blocks, sub_connections) Parameters ---------- blocks : list[Block] internal blocks of the subsystem connections : list[Connection] internal connections of the subsystem Attributes ---------- interface : Interface internal interface block for data transfer to the outside """ def __init__(self, blocks=None, connections=None): super().__init__() #internal connecions self.connections = [] if connections is None else connections #collect and organize internal blocks self.blocks, self.interface = [], None if blocks is not None: for block in blocks: if isinstance(block, Interface): if self.interface is not None: #interface block is already defined raise ValueError("Subsystem can only have one 'Interface' block!") self.interface = block else: #regular blocks self.blocks.append(block) #check if interface is defined if self.interface is None: raise ValueError("Subsystem 'blocks' list needs to contain 'Interface' block!") #validate the internal connections upon initialization self._check_connections() def __len__(self): """Recursively compute the longest signal path in the subsytem by depth first search, leveraging the '__len__' methods of the blocks. This enables the path length computation even for nested subsystems. Iterate internal blocks and compute longest path from each block as starting block. Basically the same as in the 'Simulation' class. """ max_path_length = 0 for block in [self.interface, *self.blocks]: path_length = path_length_dfs(self.connections, block) if path_length > max_path_length: max_path_length = path_length return max_path_length def __call__(self): """Recursively get the subsystems internal states of engines (if available) of all internal blocks and nested subsystems and the subsystem inputs and outputs as arrays for use outside. Either for monitoring, postprocessing or event detection. In any case this enables easy access to the current block state. """ _inputs = self.interface.outputs.to_array() _outputs = self.interface.inputs.to_array() _states = [] for block in self.blocks: _i, _o, _s = block() _states.append(_s) return _inputs, _outputs, np.hstack(_states) def __contains__(self, other): """Check if blocks and connections are already part of the subsystem Paramters --------- other : obj object to check if its part of subsystem Returns ------- bool """ if isinstance(other, Block): return other in self.blocks elif isinstance(other, Connection): return other in self.connections else: return False def _check_connections(self): """Check if connections are valid and if there is no input port that recieves multiple outputs and could be overwritten unintentionally. If multiple outputs are assigned to the same input, an error is raised. """ #iterate connections and check if they are valid for i, conn_1 in enumerate(self.connections): #check if connections overwrite each other and raise exception for conn_2 in self.connections[(i+1):]: if conn_1.overwrites(conn_2): _msg = f"{conn_1} overwrites {conn_2}" raise ValueError(_msg) # visualization -------------------------------------------------------------------------
[docs] def plot(self, *args, **kwargs): """Plot the simulation results by calling all the blocks that have visualization capabilities such as the 'Scope' and 'Spectrum'. Parameters ---------- args : tuple args for the plot methods kwargs : dict kwargs for the plot method """ for block in self.blocks: block.plot(*args, **kwargs)
# system management ---------------------------------------------------------------------
[docs] def reset(self): """Reset the subsystem and all internal blocks""" super().reset() #reset interface self.interface.reset() #reset internal blocks for block in self.blocks: block.reset()
[docs] def on(self): """Activate the subsystem and all internal blocks, sets the boolean evaluation flag to 'True'. """ self._active = True for block in self.blocks: block.on()
[docs] def off(self): """Deactivate the subsystem and all internal blocks, sets the boolean evaluation flag to 'False'. Also resets the subsystem. """ self._active = False for block in self.blocks: block.off() self.reset()
[docs] def linearize(self, t): """Linearize the algebraic and dynamic components of the internal blocks. This is done by linearizing the internal 'Operator' and 'DynamicOperator' instances of all the internal blocks of the subsystem in the current system operating point. The operators create 1st order tayler approximations internally and use them on subsequent calls after linarization. Recursively traverses down the hierarchy for nested subsystems and linearizes all of them. Parameters ---------- t : float evaluation time """ for block in self.blocks: block.linearize(t)
[docs] def delinearize(self): """Revert the linearization of the internal blocks.""" for block in self.blocks: block.delinearize()
# serialization / deserialization -------------------------------------------------------
[docs] def to_dict(self): """Custom serialization for Subsystem""" data = super().to_dict() #serialization for internal blocks and interface data["params"]["blocks"] = [block.to_dict() for block in self.blocks + [self.interface]] #serialize connections data["params"]["connections"] = [conn.to_dict() for conn in self.connections] return data
[docs] @classmethod def from_dict(cls, data): """Custom deserialization for Subsystem""" from .connection import Connection #deserialize blocks and create block ID mapping blocks, id_to_block = [], {} for blk_data in data["params"].pop("blocks", []): block = Block.from_dict(blk_data) blocks.append(block) id_to_block[blk_data["id"]] = block #deserialize connections connections = [] for conn_data in data["params"].pop("connections", []): #source data source_block = id_to_block[conn_data["source"]["block"]] source_ports = conn_data["source"]["ports"] source = PortReference(source_block, source_ports) #target data targets = [] for trg in conn_data["targets"]: target_block = id_to_block[trg["block"]] target_ports = trg["ports"] targets.append( PortReference(target_block, target_ports) ) #create the connection connections.append( Connection(source, *targets) ) #finally construct the subsystem return cls(blocks, connections)
# methods for discrete event management -------------------------------------------------
[docs] def get_events(self): """Recursively collect and return events spawned by the internal blocks of the subsystem, for discrete time blocks such as triggers / comparators, clocks, etc. """ _events = [] for block in self.blocks: _events.extend(block.get_events()) return _events
# methods for inter-block data transfer -------------------------------------------------
[docs] def set(self, port, value): """The 'set' method of the 'Subsystem' sets the output values of the 'Interface' block. Parameters ---------- port : int input port to set value to value : numeric value to set at input port (of subsystem) """ self.interface.set_output(port, value)
[docs] def get(self, port): """The 'get' method of the 'Subsystem' retrieves the input values of the 'Interface' block. Parameters ---------- port : int output port (of subsystem) to retrieve value from """ return self.interface.get_input(port)
# methods for data recording ------------------------------------------------------------
[docs] def sample(self, t): """Update the internal connections again and sample data from the internal blocks that implement the 'sample' method. Parameters ---------- t : float evaluation time """ #record data if required for block in self.blocks: block.sample(t)
# methods for block output and state updates --------------------------------------------
[docs] def update(self, t): """Update the instant time components of the internal blocks to evaluate the (distributed) system equation. Parameters ---------- t : float evaluation time Returns ------- max_error : float error tolerance of system equation convergence """ #update internal connections (data transfer) for connection in self.connections: connection.update() #update internal blocks max_error = 0.0 for block in self.blocks: error = block.update(t) if error > max_error: max_error = error #return subsystem convergence error return max_error
[docs] def solve(self, t, dt): """ Advance solution of implicit update equation for internal blocks. Parameters ---------- t : float evaluation time dt : float timestep Returns ------- max_error : float maximum error of implicit update equaiton """ max_error = 0.0 for block in self.blocks: error = block.solve(t, dt) if error > max_error: max_error = error return max_error
[docs] def step(self, t, dt): """Explicit component of timestep for internal blocks including error propagation. Notes ----- This is pretty much an exact copy of the '_step' method from the 'Simulation' class. Parameters ---------- t : float evaluation time dt : float timestep Returns ------- success : bool indicator if the timestep was successful max_error : float maximum local truncation error from integration scale : float rescale factor for timestep """ #initial timestep rescale and error estimate success, max_error_norm, relevant_scales = True, 0.0, [] #step blocks and get error estimates if available for block in self.blocks: ss, err_norm, scl = block.step(t, dt) #check solver stepping success if not ss: success = False #update error tracking if err_norm > max_error_norm: max_error_norm = err_norm #update timestep rescale if relevant if scl not in [0.0, 1.0]: relevant_scales.append(scl) #no relevant timestep rescale -> quit early if not relevant_scales: return success, max_error_norm, 1.0 #compute real timestep rescale return success, max_error_norm, min(relevant_scales)
# methods for blocks with integration engines -------------------------------------------
[docs] def set_solver(self, Solver, **solver_args): """Initialize all blocks with solver for numerical integration and additional args for the solver such as tolerances, etc. If blocks already have solvers, change the numerical integrator to the 'Solver' class. Parameters ---------- Solver : Solver numerical solver definition solver_args : dict args to initialize solver with """ #iterate all blocks and set integration engines for block in self.blocks: block.set_solver(Solver, **solver_args)
[docs] def revert(self): """revert the internal blocks to the state of the previous timestep """ for block in self.blocks: block.revert()
[docs] def buffer(self, dt): """buffer internal states of blocks with internal integration engines Parameters ---------- dt : float evaluation time for buffering """ for block in self.blocks: block.buffer(dt)