########################################################################################
##
## ADAPTIV BUFFER CLASS DEFINITION
## (utils/adaptivebuffer.py)
##
## Milan Rother 2024
##
########################################################################################
# IMPORTS ==============================================================================
from collections import deque
from bisect import bisect_left
# HELPER CLASS =========================================================================
[docs]
class AdaptiveBuffer:
"""A class that manages an adaptive buffer for delay modeling which is primarily
used in the pathsim 'Delay' block.
It implements a linear interpolation for arbitraty time lookup.
Parameters
----------
delay : float
time delay in seconds
Attributes
----------
buffer : deque
deque that collects the data for buffering
counter : int
count the number of lookups
clean_every : int
interval for buffer cleanup
"""
def __init__(self, delay):
#the buffer uses a double ended queue
self.buffer = deque()
self.delay = delay
#for buffer cleanup every 100 lookups
self.clean_every = 100
self.counter = 0
def __len__(self):
return len(self.buffer)
[docs]
def add(self, t, value):
"""adding a new datapoint to the buffer
Parameters
----------
t : float
time to add
value : float, int, complex
numerical value to add
"""
#add the time-value tuple
self.buffer.append((t, value))
#clean up the buffer
if self.counter > self.clean_every:
self.counter = 0
while len(self.buffer) > 1 and t > self.delay + self.buffer[0][0] :
self.buffer.popleft()
else:
self.counter += 1
[docs]
def get(self, t):
"""lookup datapoint from buffer
Parameters
----------
t : float
time for lookup
"""
#default 0
if not self.buffer:
return 0.0
#interpolation
target_time = t - self.delay
#requested time too small -> return first value
if target_time <= self.buffer[0][0]:
return self.buffer[0][1]
#requested time too large -> return last value
if target_time >= self.buffer[-1][0]:
return self.buffer[-1][1]
#find buffer index for requested time
i = bisect_left(self.buffer, (target_time,))
t0, y0 = self.buffer[i-1]
t1, y1 = self.buffer[i]
#linear interpolation
return y0 + (y1 - y0) * (target_time - t0) / (t1 - t0)
[docs]
def clear(self):
"""clear the buffer, reset everything"""
self.buffer = deque()
self.counter = 0