Source code for pathsim.utils.adaptivebuffer
########################################################################################
##
## ADAPTIV BUFFER CLASS DEFINITION
## (utils/adaptivebuffer.py)
##
## Milan Rother 2024
##
########################################################################################
# IMPORTS ==============================================================================
import numpy as np
from collections import deque
from bisect import bisect_left
# HELPER CLASS =========================================================================
[docs]
class AdaptiveBuffer:
"""A class that manages an adaptive buffer for delay modeling which is primarily
used in the pathsim 'Delay' block but might have future applications aswell.
It implements a linear interpolation for arbitrary time lookup.
Parameters
----------
delay : float
time delay in seconds
Attributes
----------
buffer_t : deque
deque that collects the time data for buffering
buffer_v : deque
deque that collects the value data for buffering
ns : int
safety for buffer truncation
"""
def __init__(self, delay):
#the buffer uses a double ended queue
self.delay = delay
self.buffer_t = deque()
self.buffer_v = deque()
#safety for buffer truncation
self.ns = 5
def __len__(self):
return len(self.buffer_t)
[docs]
def add(self, t, value):
"""adding a new datapoint to the buffer
Parameters
----------
t : float
time to add
value : float, int, complex
numerical value to add
"""
#add the time-value tuple
self.buffer_t.append(t)
self.buffer_v.append(value)
#remove values after safety from buffer -> enable interpolation
if len(self.buffer_t) > self.ns:
while t - self.buffer_t[self.ns] > self.delay:
self.buffer_t.popleft()
self.buffer_v.popleft()
[docs]
def interp(self, t):
"""interpolate buffer at defined lookup time
Parameters
----------
t : float
time for interpolation
Returns
-------
out : float, array
interpolated value
"""
#empty or time too small -> return zero
if not self.buffer_t or t <= self.buffer_t[0]:
return 0.0
#requested time too large -> return last value
if t >= self.buffer_t[-1]:
return self.buffer_v[-1]
#find buffer index for requested time
i = bisect_left(self.buffer_t, t)
t0, t1 = self.buffer_t[i], self.buffer_t[i-1]
y0, y1 = self.buffer_v[i], self.buffer_v[i-1]
#linear interpolation
return y0 + (y1 - y0) * (t - t0) / (t1 - t0)
[docs]
def get(self, t):
"""lookup datapoint from buffer with
delay at `t_lookup = t - delay`
Parameters
----------
t : float
time for lookup with delay
"""
return self.interp(t - self.delay)
[docs]
def clear(self):
"""clear the buffer, reset everything"""
self.buffer_t.clear()
self.buffer_v.clear()
[docs]
def to_checkpoint(self, prefix):
"""Serialize buffer state for checkpointing.
Parameters
----------
prefix : str
NPZ key prefix
Returns
-------
npz_data : dict
numpy arrays keyed by path
"""
npz_data = {}
if self.buffer_t:
npz_data[f"{prefix}/buffer_t"] = np.array(list(self.buffer_t))
npz_data[f"{prefix}/buffer_v"] = np.array(list(self.buffer_v))
return npz_data
[docs]
def load_checkpoint(self, npz, prefix):
"""Restore buffer state from checkpoint.
Parameters
----------
npz : dict-like
numpy arrays from checkpoint NPZ
prefix : str
NPZ key prefix
"""
self.clear()
t_key = f"{prefix}/buffer_t"
v_key = f"{prefix}/buffer_v"
if t_key in npz and v_key in npz:
times = npz[t_key]
values = npz[v_key]
for t, v in zip(times, values):
self.buffer_t.append(float(t))
self.buffer_v.append(v)