Source code for pathsim.blocks.fmu

#########################################################################################
##
##                           FUNCTIONAL MOCK-UP UNIT (FMU) BLOCKS
##                                   (pathsim/blocks/fmu.py)
##
#########################################################################################

# IMPORTS ===============================================================================

import numpy as np

from ._block import Block
from .dynsys import DynamicalSystem

from ..events.schedule import Schedule, ScheduleList
from ..events.zerocrossing import ZeroCrossing


# BLOCKS ================================================================================

[docs] class CoSimulationFMU(Block): """Co-Simulation FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for co-simulation. The FMU encapsulates a simulation model that can be executed independently and synchronized with the main simulation. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values dt : float, optional communication step size for co-simulation. If None, uses the FMU's default experiment step size if available. Attributes ---------- fmu_path : str path to the FMU file instance_name : str name of the FMU instance start_values : dict initial values for FMU variables model_description : ModelDescription FMI model description from FMPy fmu : FMU2Slave or FMU3Slave FMPy FMU instance for co-simulation fmi_version : str FMI version ('2.0' or '3.0') _input_refs : dict reference map for input variables _output_refs : dict reference map for output variables FMU Capabilities ---------------- can_interpolate_inputs : bool whether FMU can interpolate inputs between communication points can_handle_variable_step : bool whether FMU supports variable communication step sizes default_step_size : float or None recommended default step size from FMU max_output_derivative_order : int maximum order of output derivatives available """ #max number of ports (will be configured based on FMU) _n_in_max = None _n_out_max = None #maps for input and output port labels _port_map_in = {} _port_map_out = {} def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, dt=None, verbose=False): # Import FMPy here to avoid requiring it as a dependency if not used try: from fmpy import read_model_description, extract from fmpy.fmi2 import FMU2Slave from fmpy.fmi3 import FMU3Slave except ImportError: raise ImportError("FMPy is required for FMU blocks. Install with: pip install fmpy") self.fmu_path = fmu_path self.instance_name = instance_name self.start_values = start_values if start_values is not None else {} self.verbose = verbose # Read model description self.model_description = read_model_description(fmu_path) # Detect FMI version self.fmi_version = self.model_description.fmiVersion # Extract FMU self.unzipdir = extract(fmu_path) # Extract metadata and capabilities self._extract_fmu_metadata() # Use provided dt or fall back to FMU default if dt is None: if self.default_step_size is not None: self.dt = self.default_step_size else: raise ValueError("No step size provided and FMU has no default experiment step size") else: self.dt = dt # Get input and output variable references self._input_refs = {} self._output_refs = {} for variable in self.model_description.modelVariables: if variable.causality == 'input': self._input_refs[variable.name] = variable.valueReference self._port_map_in[variable.name] = len(self._input_refs) - 1 elif variable.causality == 'output': self._output_refs[variable.name] = variable.valueReference self._port_map_out[variable.name] = len(self._output_refs) - 1 # Initialize base class with proper port configuration super().__init__() # Instantiate FMU based on version if self.fmi_version.startswith('2.'): self.fmu = FMU2Slave( guid=self.model_description.guid, unzipDirectory=self.unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName=self.instance_name ) elif self.fmi_version.startswith('3.'): self.fmu = FMU3Slave( guid=self.model_description.guid, unzipDirectory=self.unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName=self.instance_name ) else: raise ValueError(f"Unsupported FMI version: {self.fmi_version}") # Setup experiment self.fmu.instantiate() # FMI 3.0 has different initialization sequence if self.fmi_version.startswith('3.'): self.fmu.enterInitializationMode( tolerance=None, startTime=0.0, stopTime=None ) else: self.fmu.setupExperiment(startTime=0.0) self.fmu.enterInitializationMode() # Set start values self._set_start_values() # Exit initialization mode self.fmu.exitInitializationMode() # Internal scheduled event function self.events = [ Schedule( t_start=0, t_period=self.dt, func_act=self._step_fmu ) ] # Read initial outputs self._update_outputs_from_fmu() def _extract_fmu_metadata(self): """Extract metadata and capabilities from FMU.""" cs = self.model_description.coSimulation if cs is None: raise ValueError("FMU does not support Co-Simulation") # Extract capabilities (consistent across FMI 2.0 and 3.0) self.can_interpolate_inputs = getattr(cs, 'canInterpolateInputs', False) self.can_handle_variable_step = getattr(cs, 'canHandleVariableCommunicationStepSize', False) self.max_output_derivative_order = getattr(cs, 'maxOutputDerivativeOrder', 0) # FMI 3.0 specific capabilities if self.fmi_version.startswith('3.'): self.can_return_early = getattr(cs, 'canReturnEarlyAfterIntermediateUpdate', False) self.has_event_mode = getattr(cs, 'hasEventMode', False) self.recommended_intermediate_input_smoothness = getattr(cs, 'recommendedIntermediateInputSmoothness', 0) else: self.can_return_early = False self.has_event_mode = False self.recommended_intermediate_input_smoothness = 0 # Extract default experiment settings default_experiment = self.model_description.defaultExperiment if default_experiment is not None: self.default_start_time = getattr(default_experiment, 'startTime', 0.0) self.default_stop_time = getattr(default_experiment, 'stopTime', None) self.default_step_size = getattr(default_experiment, 'stepSize', None) self.default_tolerance = getattr(default_experiment, 'tolerance', None) else: self.default_start_time = 0.0 self.default_stop_time = None self.default_step_size = None self.default_tolerance = None # Model metadata self.model_name = self.model_description.modelName self.generation_tool = getattr(self.model_description, 'generationTool', 'Unknown') self.generation_date = getattr(self.model_description, 'generationDateAndTime', 'Unknown') self.description = getattr(self.model_description, 'description', '') self.author = getattr(self.model_description, 'author', 'Unknown') self.version = getattr(self.model_description, 'version', 'Unknown') def _set_start_values(self): """Set initial values for FMU variables.""" for name, value in self.start_values.items(): for variable in self.model_description.modelVariables: if variable.name == name: if variable.type in ['Real', 'Float64', 'Float32']: # FMI 3.0 uses Float64/Float32 self.fmu.setReal([variable.valueReference], [value]) elif variable.type in ['Integer', 'Int64', 'Int32', 'Int16', 'Int8']: self.fmu.setInteger([variable.valueReference], [int(value)]) elif variable.type == 'Boolean': self.fmu.setBoolean([variable.valueReference], [bool(value)]) def _step_fmu(self, t): """Perform one FMU co-simulation step""" self._update_fmu_from_inputs() # Perform co-simulation step self.fmu.doStep( currentCommunicationPoint=t, communicationStepSize=self.dt ) self._update_outputs_from_fmu() def _update_fmu_from_inputs(self): """Read block inputs and update FMU inputs.""" if len(self._input_refs) > 0: input_vrefs = list(self._input_refs.values()) self.fmu.setReal(input_vrefs, self.inputs.to_array()) def _update_outputs_from_fmu(self): """Read outputs from FMU and update block outputs.""" if len(self._output_refs) > 0: output_vrefs = list(self._output_refs.values()) self.outputs.update_from_array(self.fmu.getReal(output_vrefs))
[docs] def update(self, t): """Update FMU inputs/outputs between scheduled steps if interpolation supported.""" if self.can_interpolate_inputs: self._update_fmu_from_inputs() self._update_outputs_from_fmu()
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu.reset() if self.fmi_version.startswith('3.'): self.fmu.enterInitializationMode( tolerance=None, startTime=0.0, stopTime=None ) else: self.fmu.enterInitializationMode() self.fmu.exitInitializationMode() self._update_outputs_from_fmu()
def __len__(self): """FMU is a discrete time source-like block without direct passthrough""" return 0 def __del__(self): """Cleanup FMU resources.""" try: self.fmu.terminate() self.fmu.freeInstance() except: pass
[docs] class ModelExchangeFMU(DynamicalSystem): """Model Exchange FMU block using FMPy with support for FMI 2.0 and FMI 3.0. This block wraps an FMU (Functional Mock-up Unit) for model exchange. The FMU provides the right-hand side of an ODE system that is integrated by PathSim's numerical solvers. Internal FMU events (state events, time events, and step completion events) are translated to PathSim events. Parameters ---------- fmu_path : str path to the FMU file (.fmu) instance_name : str, optional name for the FMU instance (default: 'fmu_instance') start_values : dict, optional dictionary of variable names and their initial values tolerance : float, optional tolerance for event detection (default: 1e-10) verbose : bool, optional enable verbose output (default: False) Attributes ---------- fmu_path : str path to the FMU file instance_name : str name of the FMU instance start_values : dict initial values for FMU variables model_description : ModelDescription FMI model description from FMPy fmu : FMU2Model or FMU3Model FMPy FMU instance for model exchange fmi_version : str FMI version ('2.0' or '3.0') _input_refs : dict reference map for input variables _output_refs : dict reference map for output variables n_states : int number of continuous states n_event_indicators : int number of event indicators time_event : ScheduleList or None dynamic time event for FMU-scheduled events """ #max number of ports (will be configured based on FMU) _n_in_max = None _n_out_max = None #maps for input and output port labels _port_map_in = {} _port_map_out = {} def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, tolerance=1e-10, verbose=False): # Import FMPy here to avoid requiring it as a dependency if not used try: from fmpy import read_model_description, extract from fmpy.fmi2 import FMU2Model from fmpy.fmi3 import FMU3Model except ImportError: raise ImportError("FMPy is required for FMU blocks. Install with: pip install fmpy") self.fmu_path = fmu_path self.instance_name = instance_name self.start_values = start_values if start_values is not None else {} self.verbose = verbose self.tolerance = tolerance # Read model description self.model_description = read_model_description(fmu_path) # Detect FMI version self.fmi_version = self.model_description.fmiVersion # Extract FMU self.unzipdir = extract(fmu_path) # Extract metadata and capabilities self._extract_fmu_metadata() # Get input and output variable references self._input_refs = {} self._output_refs = {} for variable in self.model_description.modelVariables: if variable.causality == 'input': self._input_refs[variable.name] = variable.valueReference self._port_map_in[variable.name] = len(self._input_refs) - 1 elif variable.causality == 'output': self._output_refs[variable.name] = variable.valueReference self._port_map_out[variable.name] = len(self._output_refs) - 1 # Get continuous states and derivatives self.n_states = self.model_description.numberOfContinuousStates self.n_event_indicators = self.model_description.numberOfEventIndicators # Instantiate FMU based on version if self.fmi_version.startswith('2.'): self.fmu = FMU2Model( guid=self.model_description.guid, unzipDirectory=self.unzipdir, modelIdentifier=self.model_description.modelExchange.modelIdentifier, instanceName=self.instance_name ) elif self.fmi_version.startswith('3.'): self.fmu = FMU3Model( guid=self.model_description.guid, unzipDirectory=self.unzipdir, modelIdentifier=self.model_description.modelExchange.modelIdentifier, instanceName=self.instance_name ) else: raise ValueError(f"Unsupported FMI version: {self.fmi_version}") # Setup FMU self.fmu.instantiate() # FMI 3.0 has different initialization sequence if self.fmi_version.startswith('3.'): self.fmu.enterInitializationMode( tolerance=self.tolerance, startTime=0.0, stopTime=None ) else: self.fmu.setupExperiment(tolerance=self.tolerance, startTime=0.0) self.fmu.enterInitializationMode() # Set start values self._set_start_values() # Exit initialization mode and check for initial events event_info = self.fmu.exitInitializationMode() # Store initial time event if defined self._initial_time_event = ( event_info.nextEventTime if event_info and getattr(event_info, 'nextEventTimeDefined', False) else None ) # Enter continuous time mode after initialization self.fmu.enterContinuousTimeMode() # Get initial continuous states initial_states = self._get_continuous_states() # Initialize parent DynamicalSystem with FMU dynamics super().__init__( func_dyn=self._get_derivatives, func_alg=self._get_outputs, initial_value=initial_states, jac_dyn=None # Could add Jacobian support later via directional derivatives ) # Initialize time event manager (will be populated dynamically) self.time_event = None # Create state event (zero-crossing) events for each event indicator for i in range(self.n_event_indicators): event = ZeroCrossing( func_evt=lambda t, idx=i: self._get_event_indicator(idx), func_act=self._handle_event, tolerance=self.tolerance ) self.events.append(event) # Schedule initial time event if any if self._initial_time_event is not None: self._update_time_events(self._initial_time_event) def _extract_fmu_metadata(self): """Extract metadata and capabilities from FMU.""" me = self.model_description.modelExchange if me is None: raise ValueError("FMU does not support Model Exchange") # Extract capabilities self.provides_directional_derivative = getattr(me, 'providesDirectionalDerivative', False) self.completed_integrator_step_not_needed = getattr(me, 'completedIntegratorStepNotNeeded', False) # Model metadata self.model_name = self.model_description.modelName self.generation_tool = getattr(self.model_description, 'generationTool', 'Unknown') self.generation_date = getattr(self.model_description, 'generationDateAndTime', 'Unknown') self.description = getattr(self.model_description, 'description', '') self.author = getattr(self.model_description, 'author', 'Unknown') self.version = getattr(self.model_description, 'version', 'Unknown') def _set_start_values(self): """Set initial values for FMU variables.""" if not self.start_values: return # Build variable lookup dict for efficient access var_map = {v.name: v for v in self.model_description.modelVariables} for name, value in self.start_values.items(): variable = var_map.get(name) if variable is None: continue vr = variable.valueReference if self.fmi_version.startswith('2'): # FMI 2.0 API if variable.type in ['Real', 'Float64', 'Float32']: self.fmu.setReal([vr], [value]) elif variable.type in ['Integer', 'Int64', 'Int32', 'Int16', 'Int8']: self.fmu.setInteger([vr], [int(value)]) elif variable.type == 'Boolean': self.fmu.setBoolean([vr], [bool(value)]) else: # FMI 3.0 API (FMPy provides simplified wrappers) if variable.type in ['Real', 'Float64', 'Float32']: self.fmu.setFloat64([vr], [value]) elif variable.type in ['Integer', 'Int64', 'Int32', 'Int16', 'Int8']: self.fmu.setInt64([vr], [int(value)]) elif variable.type == 'Boolean': self.fmu.setBoolean([vr], [bool(value)]) def _get_continuous_states(self): """Get continuous states from FMU (handles FMI 2.0 and 3.0 API differences). Returns ------- states : array continuous state vector """ if self.n_states == 0: return np.array([]) if self.fmi_version.startswith('2'): return self.fmu.getContinuousStates() else: # FMI 3.0 import ctypes states = (ctypes.c_double * self.n_states)() self.fmu.getContinuousStates(states, self.n_states) return np.array(states) def _set_fmu_state(self, x, u, t): """Set FMU state (time, continuous states, inputs). Parameters ---------- x : array continuous state vector u : array input vector t : float current time """ self.fmu.setTime(t) if self.n_states > 0: if self.fmi_version.startswith('2'): self.fmu.setContinuousStates(x) else: # FMI 3.0 import ctypes x_ctypes = (ctypes.c_double * self.n_states)(*x) self.fmu.setContinuousStates(x_ctypes, self.n_states) if self._input_refs: input_vrefs = list(self._input_refs.values()) if self.fmi_version.startswith('2'): self.fmu.setReal(input_vrefs, u) else: # FMI 3.0 self.fmu.setFloat64(input_vrefs, u) def _get_derivatives(self, x, u, t): """Evaluate FMU derivatives (RHS of ODE). Parameters ---------- x : array continuous state vector u : array input vector t : float current time Returns ------- dx : array state derivatives """ if self.n_states == 0: return np.array([]) self._set_fmu_state(x, u, t) if self.fmi_version.startswith('2'): return self.fmu.getDerivatives() else: # FMI 3.0 import ctypes derivatives = (ctypes.c_double * self.n_states)() self.fmu.getContinuousStateDerivatives(derivatives, self.n_states) return np.array(derivatives) def _get_outputs(self, x, u, t): """Evaluate FMU outputs (algebraic part). Parameters ---------- x : array continuous state vector u : array input vector t : float current time Returns ------- y : array output vector """ self._set_fmu_state(x, u, t) if not self._output_refs: return np.array([]) output_vrefs = list(self._output_refs.values()) if self.fmi_version.startswith('2'): return self.fmu.getReal(output_vrefs) else: # FMI 3.0 return np.array(self.fmu.getFloat64(output_vrefs))
[docs] def sample(self, t, dt): """Sample block after successful timestep and handle FMU step completion events. This is called by the simulation after a complete timestep (all RK stages finished). It's the proper place to call completedIntegratorStep() per FMI standard. Parameters ---------- t : float evaluation time for sampling dt : float integration timestep """ # Call parent's sample method (if any) super().sample(t, dt) # If FMU requires completedIntegratorStep, call it after successful step if not self.completed_integrator_step_not_needed: # Notify FMU that integration step completed enter_event_mode, terminate_simulation = self.fmu.completedIntegratorStep() if terminate_simulation: if self.verbose: print("FMU requested termination in completedIntegratorStep") raise RuntimeError("FMU requested simulation termination") # If FMU signals a step event if enter_event_mode: if self.verbose: print(f"Step completion event at t={t}") # Handle the step event self._handle_event(t)
def _get_event_indicator(self, idx): """Get value of a specific event indicator. Parameters ---------- idx : int index of the event indicator Returns ------- float event indicator value """ if self.fmi_version.startswith('2'): indicators = self.fmu.getEventIndicators() return indicators[idx] else: # FMI 3.0 import ctypes indicators = (ctypes.c_double * self.n_event_indicators)() self.fmu.getEventIndicators(indicators, self.n_event_indicators) return indicators[idx] def _handle_event(self, t): """Handle FMU event with fixed-point iteration for discrete states. Parameters ---------- t : float event time """ if self.verbose: print(f"FMU event detected at t={t}") # Enter event mode before event iteration self.fmu.enterEventMode() # Perform event update iteration until discrete states stabilize if self.fmi_version.startswith('2'): # FMI 2.0 API - returns EventInfo object while True: event_info = self.fmu.eventUpdate() # Check if simulation should terminate if event_info.terminateSimulation: if self.verbose: print("FMU requested simulation termination") raise RuntimeError("FMU requested simulation termination") # Break if no more discrete state updates needed if not event_info.newDiscreteStatesNeeded: break else: # FMI 3.0 API - returns tuple # (discreteStatesNeedUpdate, terminateSimulation, nominalsChanged, # valuesChanged, nextEventTimeDefined, nextEventTime) while True: result = self.fmu.updateDiscreteStates() discreteStatesNeedUpdate = result[0] terminateSimulation = result[1] valuesOfContinuousStatesChanged = result[3] nextEventTimeDefined = result[4] nextEventTime = result[5] # Check if simulation should terminate if terminateSimulation: if self.verbose: print("FMU requested simulation termination") raise RuntimeError("FMU requested simulation termination") # Break if no more discrete state updates needed if not discreteStatesNeedUpdate: break # Create event_info-like object for unified handling below class EventInfo: pass event_info = EventInfo() event_info.valuesOfContinuousStatesChanged = valuesOfContinuousStatesChanged event_info.nextEventTimeDefined = nextEventTimeDefined event_info.nextEventTime = nextEventTime # Re-enter continuous time mode after event handling self.fmu.enterContinuousTimeMode() # Check if continuous states changed during event if event_info.valuesOfContinuousStatesChanged: x_new = self._get_continuous_states() self.engine.set(x_new) if self.verbose: print(f"Continuous states updated after event: {x_new}") # Update time events if FMU scheduled new ones if event_info.nextEventTimeDefined: self._update_time_events(event_info.nextEventTime) if self.verbose: print(f"Next time event scheduled at t={event_info.nextEventTime}") def _update_time_events(self, next_time): """Update or create time event schedule. Parameters ---------- next_time : float next scheduled event time """ if self.time_event is None: # Create new ScheduleList with the first time event self.time_event = ScheduleList( times_evt=[next_time], func_act=self._handle_event, tolerance=self.tolerance ) self.events.append(self.time_event) elif next_time not in self.time_event.times_evt: # Insert new time in sorted order import bisect bisect.insort(self.time_event.times_evt, next_time)
[docs] def reset(self): """Reset the FMU instance.""" super().reset() self.fmu.reset() # Re-initialize FMU if self.fmi_version.startswith('3.'): self.fmu.enterInitializationMode( tolerance=self.tolerance, startTime=0.0, stopTime=None ) else: self.fmu.setupExperiment(tolerance=self.tolerance, startTime=0.0) self.fmu.enterInitializationMode() self._set_start_values() event_info = self.fmu.exitInitializationMode() self.fmu.enterContinuousTimeMode() # Reset to initial states self.engine.set(self._get_continuous_states()) # Reset time events and re-schedule initial event if present if self.time_event is not None: self.time_event.times_evt.clear() if self._initial_time_event is not None: import bisect bisect.insort(self.time_event.times_evt, self._initial_time_event)
def __del__(self): """Cleanup FMU resources.""" try: self.fmu.terminate() self.fmu.freeInstance() except Exception: pass