Source code for pathsim.blocks.fmu

#########################################################################################
##
##                           FUNCTIONAL MOCK-UP UNIT (FMU) BLOCKS
##                                   (pathsim/blocks/fmu.py)
##
#########################################################################################

# IMPORTS ===============================================================================

import bisect

from ._block import Block
from .dynsys import DynamicalSystem

from ..events.schedule import Schedule, ScheduleList
from ..events.zerocrossing import ZeroCrossing
from ..utils.fmuwrapper import FMUWrapper
from ..utils.deprecation import deprecated


# BLOCKS ================================================================================

[docs] @deprecated( version="1.0.0", replacement="pathsim_fmi.CoSimulationFMU", reason="This block has moved to the pathsim-fmi package: pip install pathsim-fmi", ) class CoSimulationFMU(Block): """Co-Simulation FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for co-simulation. The FMU encapsulates a simulation model that can be executed independently and synchronized with the main simulation at discrete communication points. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values dt : float, optional communication step size for co-simulation. If None, uses the FMU's default experiment step size if available. Attributes ---------- fmu_wrapper : FMUWrapper version-agnostic FMU wrapper instance providing access to model_description, fmu, and other FMPy objects for advanced usage dt : float communication step size """ def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, dt=None): super().__init__() self.start_values = start_values # Create and initialize FMU wrapper self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='cosimulation') self.fmu_wrapper.initialize(start_values, start_time=0.0) # Determine step size self.dt = dt if dt is not None else self.fmu_wrapper.default_step_size if self.dt is None: raise ValueError("No step size provided and FMU has no default experiment step size") # Setup block I/O from FMU variables self.inputs, self.outputs = self.fmu_wrapper.create_port_registers() # Scheduled co-simulation step self.events = [ Schedule( t_start=0, t_period=self.dt, func_act=self._step_fmu ) ] # Read initial outputs self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array()) def _step_fmu(self, t): """Perform one FMU co-simulation step.""" self.fmu_wrapper.set_inputs_from_array(self.inputs.to_array()) result = self.fmu_wrapper.do_step( current_time=t, step_size=self.dt ) if result.terminate_simulation: raise RuntimeError("FMU requested simulation termination") self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array())
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu_wrapper.reset() self.fmu_wrapper.initialize(self.start_values, start_time=0.0) self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array())
def __len__(self): """FMU is a discrete time source-like block without direct passthrough.""" return 0
[docs] @deprecated( version="1.0.0", replacement="pathsim_fmi.ModelExchangeFMU", reason="This block has moved to the pathsim-fmi package: pip install pathsim-fmi", ) class ModelExchangeFMU(DynamicalSystem): """Model Exchange FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for model exchange. The FMU provides the right-hand side of an ODE system that is integrated by PathSim's numerical solvers. Internal FMU events (state events, time events, and step completion events) are translated to PathSim events. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values tolerance : float, optional tolerance for event detection (default: 1e-10) verbose : bool, optional enable verbose output (default: False) Attributes ---------- fmu_wrapper : FMUWrapper version-agnostic FMU wrapper instance providing access to model_description, fmu, and other FMPy objects for advanced usage time_event : ScheduleList or None dynamic time event for FMU-scheduled events """ def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, tolerance=1e-10, verbose=False): self.tolerance = tolerance self.verbose = verbose self.start_values = start_values # Create and initialize FMU wrapper self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='model_exchange') event_info = self.fmu_wrapper.initialize(start_values, start_time=0.0, tolerance=tolerance) # Store initial time event if defined self._initial_time_event = ( event_info.next_event_time if event_info and event_info.next_event_time_defined else None ) # Enter continuous time mode self.fmu_wrapper.enter_continuous_time_mode() # Initialize parent DynamicalSystem with FMU dynamics # Use FMU's Jacobian if available (providesDirectionalDerivative=true) jac_func = self._get_jacobian if self.fmu_wrapper.provides_jacobian else None super().__init__( func_dyn=self._get_derivatives, func_alg=self._get_outputs, initial_value=self.fmu_wrapper.get_continuous_states(), jac_dyn=jac_func ) # Setup block I/O from FMU variables self.inputs, self.outputs = self.fmu_wrapper.create_port_registers() # Initialize time event manager self.time_event = None # Create state event (zero-crossing) for each event indicator for i in range(self.fmu_wrapper.n_event_indicators): self.events.append( ZeroCrossing( func_evt=lambda t, idx=i: self._get_event_indicator(idx), func_act=self._handle_event, tolerance=self.tolerance ) ) # Cache capability flag for sample() performance self._needs_completed_integrator_step = self.fmu_wrapper.needs_completed_integrator_step # Schedule initial time event if any if self._initial_time_event is not None: self._update_time_events(self._initial_time_event) def _get_derivatives(self, x, u, t): """Evaluate FMU derivatives (RHS of ODE).""" if self.fmu_wrapper.n_states == 0: return [] self.fmu_wrapper.set_time(t) self.fmu_wrapper.set_continuous_states(x) self.fmu_wrapper.set_inputs_from_array(u) return self.fmu_wrapper.get_derivatives() def _get_jacobian(self, x, u, t): """Evaluate Jacobian of FMU derivatives w.r.t. states (∂ẋ/∂x).""" self.fmu_wrapper.set_time(t) self.fmu_wrapper.set_continuous_states(x) self.fmu_wrapper.set_inputs_from_array(u) return self.fmu_wrapper.get_state_jacobian() def _get_outputs(self, x, u, t): """Evaluate FMU outputs (algebraic part).""" self.fmu_wrapper.set_time(t) self.fmu_wrapper.set_continuous_states(x) self.fmu_wrapper.set_inputs_from_array(u) return self.fmu_wrapper.get_outputs_as_array() def _get_event_indicator(self, idx): """Get value of a specific event indicator.""" return self.fmu_wrapper.get_event_indicators()[idx] def _handle_event(self, t): """Handle FMU event with fixed-point iteration for discrete states.""" if self.verbose: print(f"FMU event detected at t={t}") self.fmu_wrapper.enter_event_mode() # Iterate until discrete states stabilize while True: event_info = self.fmu_wrapper.update_discrete_states() if event_info.terminate_simulation: raise RuntimeError("FMU requested simulation termination") if not event_info.discrete_states_need_update: break self.fmu_wrapper.enter_continuous_time_mode() # Update continuous states if changed if event_info.values_changed: x_new = self.fmu_wrapper.get_continuous_states() self.engine.set(x_new) if self.verbose: print(f"Continuous states updated after event: {x_new}") # Schedule new time events if event_info.next_event_time_defined: self._update_time_events(event_info.next_event_time) if self.verbose: print(f"Next time event scheduled at t={event_info.next_event_time}") def _update_time_events(self, next_time): """Update or create time event schedule.""" if self.time_event is None: self.time_event = ScheduleList( times_evt=[next_time], func_act=self._handle_event, tolerance=self.tolerance ) self.events.append(self.time_event) elif next_time not in self.time_event.times_evt: bisect.insort(self.time_event.times_evt, next_time)
[docs] def sample(self, t, dt): """Sample block after successful timestep and handle FMU step completion events.""" super().sample(t, dt) if self._needs_completed_integrator_step: enter_event_mode, terminate_simulation = self.fmu_wrapper.completed_integrator_step() if terminate_simulation: raise RuntimeError("FMU requested simulation termination") if enter_event_mode: if self.verbose: print(f"Step completion event at t={t}") self._handle_event(t)
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu_wrapper.reset() # Re-initialize FMU event_info = self.fmu_wrapper.initialize( self.start_values, start_time=0.0, tolerance=self.tolerance ) self.fmu_wrapper.enter_continuous_time_mode() # Reset to initial states self.engine.set(self.fmu_wrapper.get_continuous_states()) # Reset time events if self.time_event is not None: self.time_event.times_evt.clear() # Schedule initial time event from re-initialization or cached initial if event_info and event_info.next_event_time_defined: self._update_time_events(event_info.next_event_time) elif self._initial_time_event is not None: self._update_time_events(self._initial_time_event)