Source code for pathsim.utils.adaptivebuffer


########################################################################################
##
##                         ADAPTIV BUFFER CLASS DEFINITION 
##                            (utils/adaptivebuffer.py)
##
##                                Milan Rother 2024
##
########################################################################################

# IMPORTS ==============================================================================

from collections import deque
from bisect import bisect_left


# HELPER CLASS =========================================================================

[docs] class AdaptiveBuffer: """A class that manages an adaptive buffer for delay modeling which is primarily used in the pathsim 'Delay' block but might have future applications aswell. It implements a linear interpolation for arbitrary time lookup. Parameters ---------- delay : float time delay in seconds Attributes ---------- buffer_t : deque deque that collects the time data for buffering buffer_v : deque deque that collects the value data for buffering ns : int savety for buffer truncation """ def __init__(self, delay): #the buffer uses a double ended queue self.delay = delay self.buffer_t = deque() self.buffer_v = deque() #savety for buffer truncation self.ns = 5 def __len__(self): return len(self.buffer_t)
[docs] def add(self, t, value): """adding a new datapoint to the buffer Parameters ---------- t : float time to add value : float, int, complex numerical value to add """ #add the time-value tuple self.buffer_t.append(t) self.buffer_v.append(value) #remove values after savety from buffer -> enable interpolation if len(self.buffer_t) > self.ns: while t - self.buffer_t[self.ns] > self.delay: self.buffer_t.popleft() self.buffer_v.popleft()
[docs] def interp(self, t): """interpolate buffer at defined lookup time Parameters ---------- t : float time for interpolation Returns ------- out : float, array interpolated value """ #empty or time too small -> return zero if not self.buffer_t or t <= self.buffer_t[0]: return 0.0 #requested time too large -> return last value if t >= self.buffer_t[-1]: return self.buffer_v[-1] #find buffer index for requested time i = bisect_left(self.buffer_t, t) t0, t1 = self.buffer_t[i], self.buffer_t[i-1] y0, y1 = self.buffer_v[i], self.buffer_v[i-1] #linear interpolation return y0 + (y1 - y0) * (t - t0) / (t1 - t0)
[docs] def get(self, t): """lookup datapoint from buffer with delay at `t_lookup = t - delay` Parameters ---------- t : float time for lookup with delay """ return self.interp(t - self.delay)
[docs] def clear(self): """clear the buffer, reset everything""" self.buffer_t.clear() self.buffer_v.clear()