Source code for pathsim.blocks.fmu

#########################################################################################
##
##                           FUNCTIONAL MOCK-UP UNIT (FMU) BLOCKS
##                                   (pathsim/blocks/fmu.py)
##
#########################################################################################

# IMPORTS ===============================================================================

import numpy as np

from ._block import Block
from .dynsys import DynamicalSystem

from ..events.schedule import Schedule, ScheduleList
from ..events.zerocrossing import ZeroCrossing
from ..utils.fmuwrapper import FMUWrapper


# BLOCKS ================================================================================

[docs] class CoSimulationFMU(Block): """Co-Simulation FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for co-simulation. The FMU encapsulates a simulation model that can be executed independently and synchronized with the main simulation. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values dt : float, optional communication step size for co-simulation. If None, uses the FMU's default experiment step size if available. Attributes ---------- fmu_wrapper : FMUWrapper version-agnostic FMU wrapper instance dt : float communication step size """ #max number of ports (will be configured based on FMU) _n_in_max = None _n_out_max = None #maps for input and output port labels _port_map_in = {} _port_map_out = {} def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, dt=None, verbose=False): self.fmu_path = fmu_path self.instance_name = instance_name self.verbose = verbose self.start_values = start_values if start_values is not None else {} # Create FMU wrapper self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='cosimulation') # Expose commonly used attributes for backward compatibility self.model_description = self.fmu_wrapper.model_description self.fmi_version = self.fmu_wrapper.fmi_version self.unzipdir = self.fmu_wrapper.unzipdir self.fmu = self.fmu_wrapper.fmu self._input_refs = self.fmu_wrapper.input_refs self._output_refs = self.fmu_wrapper.output_refs # Extract metadata self._extract_fmu_metadata() # Determine step size if dt is None: if self.default_step_size is not None: self.dt = self.default_step_size else: raise ValueError("No step size provided and FMU has no default experiment step size") else: self.dt = dt # Build port maps from FMU variables self._port_map_in = {name: idx for idx, name in enumerate(self.fmu_wrapper.input_refs.keys())} self._port_map_out = {name: idx for idx, name in enumerate(self.fmu_wrapper.output_refs.keys())} # Initialize base class with proper port configuration super().__init__() # Initialize FMU self.fmu_wrapper.instantiate() self.fmu_wrapper.setup_experiment(start_time=0.0) self.fmu_wrapper.enter_initialization_mode() # Set start values for name, value in self.start_values.items(): self.fmu_wrapper.set_variable(name, value) # Exit initialization mode self.fmu_wrapper.exit_initialization_mode() # Internal scheduled event function self.events = [ Schedule( t_start=0, t_period=self.dt, func_act=self._step_fmu ) ] # Read initial outputs self._update_outputs_from_fmu() def _extract_fmu_metadata(self): """Extract metadata and capabilities from FMU.""" md = self.fmu_wrapper.model_description cs = md.coSimulation if cs is None: raise ValueError("FMU does not support Co-Simulation") # Extract capabilities self.can_interpolate_inputs = getattr(cs, 'canInterpolateInputs', False) self.can_handle_variable_step = getattr(cs, 'canHandleVariableCommunicationStepSize', False) self.max_output_derivative_order = getattr(cs, 'maxOutputDerivativeOrder', 0) # Extract default experiment settings default_experiment = md.defaultExperiment if default_experiment is not None: self.default_start_time = getattr(default_experiment, 'startTime', 0.0) self.default_stop_time = getattr(default_experiment, 'stopTime', None) self.default_step_size = getattr(default_experiment, 'stepSize', None) self.default_tolerance = getattr(default_experiment, 'tolerance', None) else: self.default_start_time = 0.0 self.default_stop_time = None self.default_step_size = None self.default_tolerance = None # Model metadata self.model_name = md.modelName self.generation_tool = getattr(md, 'generationTool', 'Unknown') self.generation_date = getattr(md, 'generationDateAndTime', 'Unknown') self.description = getattr(md, 'description', '') self.author = getattr(md, 'author', 'Unknown') self.version = getattr(md, 'version', 'Unknown') def _step_fmu(self, t): """Perform one FMU co-simulation step""" self._update_fmu_from_inputs() # Perform co-simulation step result = self.fmu_wrapper.do_step( current_time=t, step_size=self.dt ) # Handle FMI 3.0 specific results if result.terminate_simulation: raise RuntimeError("FMU requested simulation termination") self._update_outputs_from_fmu() def _update_fmu_from_inputs(self): """Read block inputs and update FMU inputs.""" if len(self.fmu_wrapper.input_refs) > 0: input_vrefs = list(self.fmu_wrapper.input_refs.values()) self.fmu_wrapper.set_real(input_vrefs, self.inputs.to_array()) def _update_outputs_from_fmu(self): """Read outputs from FMU and update block outputs.""" if len(self.fmu_wrapper.output_refs) > 0: output_vrefs = list(self.fmu_wrapper.output_refs.values()) self.outputs.update_from_array(self.fmu_wrapper.get_real(output_vrefs))
[docs] def update(self, t): """Update FMU inputs/outputs between scheduled steps if interpolation supported.""" if self.can_interpolate_inputs: self._update_fmu_from_inputs() self._update_outputs_from_fmu()
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu_wrapper.reset() self.fmu_wrapper.enter_initialization_mode() self.fmu_wrapper.exit_initialization_mode() self._update_outputs_from_fmu()
def __len__(self): """FMU is a discrete time source-like block without direct passthrough""" return 0 def __del__(self): """Cleanup FMU resources.""" try: self.fmu_wrapper.terminate() self.fmu_wrapper.free_instance() except: pass
[docs] class ModelExchangeFMU(DynamicalSystem): """Model Exchange FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for model exchange. The FMU provides the right-hand side of an ODE system that is integrated by PathSim's numerical solvers. Internal FMU events (state events, time events, and step completion events) are translated to PathSim events. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values tolerance : float, optional tolerance for event detection (default: 1e-10) verbose : bool, optional enable verbose output (default: False) Attributes ---------- fmu_wrapper : FMUWrapper version-agnostic FMU wrapper instance time_event : ScheduleList or None dynamic time event for FMU-scheduled events """ #max number of ports (will be configured based on FMU) _n_in_max = None _n_out_max = None #maps for input and output port labels _port_map_in = {} _port_map_out = {} def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, tolerance=1e-10, verbose=False): self.fmu_path = fmu_path self.instance_name = instance_name self.verbose = verbose self.tolerance = tolerance self.start_values = start_values if start_values is not None else {} # Create FMU wrapper self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='model_exchange') # Expose commonly used attributes for backward compatibility self.model_description = self.fmu_wrapper.model_description self.fmi_version = self.fmu_wrapper.fmi_version self.unzipdir = self.fmu_wrapper.unzipdir self.fmu = self.fmu_wrapper.fmu self.n_states = self.fmu_wrapper.n_states self.n_event_indicators = self.fmu_wrapper.n_event_indicators self._input_refs = self.fmu_wrapper.input_refs self._output_refs = self.fmu_wrapper.output_refs # Extract metadata self._extract_fmu_metadata() # Build port maps from FMU variables self._port_map_in = {name: idx for idx, name in enumerate(self.fmu_wrapper.input_refs.keys())} self._port_map_out = {name: idx for idx, name in enumerate(self.fmu_wrapper.output_refs.keys())} # Setup FMU self.fmu_wrapper.instantiate() self.fmu_wrapper.setup_experiment(tolerance=self.tolerance, start_time=0.0) self.fmu_wrapper.enter_initialization_mode() # Set start values for name, value in self.start_values.items(): self.fmu_wrapper.set_variable(name, value) # Exit initialization mode and check for initial events event_info = self.fmu_wrapper.exit_initialization_mode() # Store initial time event if defined self._initial_time_event = ( event_info.next_event_time if event_info and event_info.next_event_time_defined else None ) # Enter continuous time mode after initialization self.fmu_wrapper.enter_continuous_time_mode() # Get initial continuous states initial_states = self.fmu_wrapper.get_continuous_states() # Initialize parent DynamicalSystem with FMU dynamics super().__init__( func_dyn=self._get_derivatives, func_alg=self._get_outputs, initial_value=initial_states, jac_dyn=None ) # Initialize time event manager self.time_event = None # Create state event (zero-crossing) events for each event indicator for i in range(self.fmu_wrapper.n_event_indicators): event = ZeroCrossing( func_evt=lambda t, idx=i: self._get_event_indicator(idx), func_act=self._handle_event, tolerance=self.tolerance ) self.events.append(event) # Schedule initial time event if any if self._initial_time_event is not None: self._update_time_events(self._initial_time_event) def _extract_fmu_metadata(self): """Extract metadata and capabilities from FMU.""" md = self.fmu_wrapper.model_description me = md.modelExchange if me is None: raise ValueError("FMU does not support Model Exchange") # Extract capabilities self.provides_directional_derivative = getattr(me, 'providesDirectionalDerivative', False) self.completed_integrator_step_not_needed = getattr(me, 'completedIntegratorStepNotNeeded', False) # Model metadata self.model_name = md.modelName self.generation_tool = getattr(md, 'generationTool', 'Unknown') self.generation_date = getattr(md, 'generationDateAndTime', 'Unknown') self.description = getattr(md, 'description', '') self.author = getattr(md, 'author', 'Unknown') self.version = getattr(md, 'version', 'Unknown') def _get_derivatives(self, x, u, t): """Evaluate FMU derivatives (RHS of ODE). Parameters ---------- x : array continuous state vector u : array input vector t : float current time Returns ------- dx : array state derivatives """ if self.fmu_wrapper.n_states == 0: return np.array([]) # Set FMU state self.fmu_wrapper.set_time(t) self.fmu_wrapper.set_continuous_states(x) if len(self.fmu_wrapper.input_refs) > 0: input_vrefs = list(self.fmu_wrapper.input_refs.values()) self.fmu_wrapper.set_real(input_vrefs, u) return self.fmu_wrapper.get_derivatives() def _get_outputs(self, x, u, t): """Evaluate FMU outputs (algebraic part). Parameters ---------- x : array continuous state vector u : array input vector t : float current time Returns ------- y : array output vector """ # Set FMU state self.fmu_wrapper.set_time(t) self.fmu_wrapper.set_continuous_states(x) if len(self.fmu_wrapper.input_refs) > 0: input_vrefs = list(self.fmu_wrapper.input_refs.values()) self.fmu_wrapper.set_real(input_vrefs, u) if len(self.fmu_wrapper.output_refs) == 0: return np.array([]) output_vrefs = list(self.fmu_wrapper.output_refs.values()) return self.fmu_wrapper.get_real(output_vrefs)
[docs] def sample(self, t, dt): """Sample block after successful timestep and handle FMU step completion events. Parameters ---------- t : float evaluation time for sampling dt : float integration timestep """ super().sample(t, dt) # If FMU requires completedIntegratorStep, call it after successful step if not self.completed_integrator_step_not_needed: enter_event_mode, terminate_simulation = self.fmu_wrapper.completed_integrator_step() if terminate_simulation: if self.verbose: print("FMU requested termination in completedIntegratorStep") raise RuntimeError("FMU requested simulation termination") if enter_event_mode: if self.verbose: print(f"Step completion event at t={t}") self._handle_event(t)
def _get_event_indicator(self, idx): """Get value of a specific event indicator. Parameters ---------- idx : int index of the event indicator Returns ------- float event indicator value """ indicators = self.fmu_wrapper.get_event_indicators() return indicators[idx] def _handle_event(self, t): """Handle FMU event with fixed-point iteration for discrete states. Parameters ---------- t : float event time """ if self.verbose: print(f"FMU event detected at t={t}") # Enter event mode before event iteration self.fmu_wrapper.enter_event_mode() # Perform event update iteration until discrete states stabilize while True: event_info = self.fmu_wrapper.update_discrete_states() # Check if simulation should terminate if event_info.terminate_simulation: if self.verbose: print("FMU requested simulation termination") raise RuntimeError("FMU requested simulation termination") # Break if no more discrete state updates needed if not event_info.discrete_states_need_update: break # Re-enter continuous time mode after event handling self.fmu_wrapper.enter_continuous_time_mode() # Check if continuous states changed during event if event_info.values_changed: x_new = self.fmu_wrapper.get_continuous_states() self.engine.set(x_new) if self.verbose: print(f"Continuous states updated after event: {x_new}") # Update time events if FMU scheduled new ones if event_info.next_event_time_defined: self._update_time_events(event_info.next_event_time) if self.verbose: print(f"Next time event scheduled at t={event_info.next_event_time}") def _update_time_events(self, next_time): """Update or create time event schedule. Parameters ---------- next_time : float next scheduled event time """ if self.time_event is None: # Create new ScheduleList with the first time event self.time_event = ScheduleList( times_evt=[next_time], func_act=self._handle_event, tolerance=self.tolerance ) self.events.append(self.time_event) elif next_time not in self.time_event.times_evt: # Insert new time in sorted order import bisect bisect.insort(self.time_event.times_evt, next_time)
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu_wrapper.reset() # Re-initialize FMU self.fmu_wrapper.setup_experiment(tolerance=self.tolerance, start_time=0.0) self.fmu_wrapper.enter_initialization_mode() for name, value in self.start_values.items(): self.fmu_wrapper.set_variable(name, value) event_info = self.fmu_wrapper.exit_initialization_mode() self.fmu_wrapper.enter_continuous_time_mode() # Reset to initial states self.engine.set(self.fmu_wrapper.get_continuous_states()) # Reset time events and re-schedule initial event if present if self.time_event is not None: self.time_event.times_evt.clear() if self._initial_time_event is not None: import bisect bisect.insort(self.time_event.times_evt, self._initial_time_event)
def __del__(self): """Cleanup FMU resources.""" try: self.fmu_wrapper.terminate() self.fmu_wrapper.free_instance() except Exception: pass