#########################################################################################
##
## REALTIME PLOTTER CLASS
## (utils/realtimeplotter.py)
##
## Milan Rother 2024
##
#########################################################################################
# IMPORTS ===============================================================================
import matplotlib.pyplot as plt
import matplotlib.style as mplstyle
mplstyle.use("fast")
import numpy as np
import time
from collections import deque
from .._constants import COLORS_ALL
# PLOTTER CLASS =========================================================================
[docs]
class RealtimePlotter:
"""Class that manages a realtime plotting window that
can stream in x-y-data and update accordingly
Parameters
----------
max_samples : int
maximum number of samples to plot
update_interval : float
time in seconds between refreshs
labels : list[str]
labels for plot traces
x_label : str
label for x-axis
y_label : str
label for y-axis
Attributes
----------
fig : matplotlib.pyplot.figure
internal figure of the realtime plotter
ax : matplotlib.pyplot.axis
internal axis of the realtime plotter
"""
def __init__(self, max_samples=None, update_interval=1, labels=[], x_label="", y_label=""):
#plotter settings
self.max_samples = max_samples
self.update_interval = update_interval
self.labels = labels
self.x_label = x_label
self.y_label = y_label
#figure initialization
self.fig, self.ax = plt.subplots(nrows=1,
ncols=1,
figsize=(8,4),
tight_layout=True,
dpi=120)
#custom colors
self.ax.set_prop_cycle(color=COLORS_ALL)
#plot settings
self.ax.set_xlabel(self.x_label)
self.ax.set_ylabel(self.y_label)
self.ax.grid(True)
#data and lines (traces) for plotting
self.lines = []
self.data = []
#tracking update time
self.last_update = time.time()
#flag for running mode
self.is_running = True
# Connect the close event to the on_close method
self.fig.canvas.mpl_connect("close_event", self.on_close)
# Initialize legend
self.legend = None
self.lined = {}
#show the plotting window
self.show()
[docs]
def update_all(self, x, y):
"""update the plot completely with new data
Parameters
----------
x : array[float]
new x values to plot
y : array[float]
new y values to plot
"""
#not running? -> quit early
if not self.is_running:
return False
#no data yet? -> initialize lines
if not self.data:
#data initialization
for i, val in enumerate(y):
self.data.append({"x": [], "y": []})
#label selection and line (trace) initialization
label = self.labels[i] if i < len(self.labels) else f"port {i}"
line, = self.ax.plot([], [], lw=1.5, label=label)
self.lines.append(line)
# Create legend
self.legend = self.ax.legend(fancybox=False, ncols=int(np.ceil(len(y)/4)), loc="lower left")
self._setup_legend_picking()
#check if new update of plot is required
current_time = time.time()
if current_time - self.last_update > self.update_interval:
#replace the data
for i, val in enumerate(y):
self.data[i]["x"] = x
self.data[i]["y"] = val
self._update_plot()
self.last_update = current_time
return True
[docs]
def update(self, x, y):
"""update the plot with new data
Parameters
----------
x : float
new x value to add
y : float
new y value to add
"""
#not running? -> quit early
if not self.is_running:
return False
#no data yet? -> initialize lines
if not self.data:
#vectorial data -> multiple traces
if np.isscalar(y):
#size of data
n = 1
#check if rolling window plot
if self.max_samples is None:
self.data.append({"x": [], "y": []})
else:
self.data.append({"x": deque(maxlen=self.max_samples),
"y": deque(maxlen=self.max_samples)})
#label selection and line (trace) initialization
label = self.labels[0] if self.labels else "port 0"
line, = self.ax.plot([], [], lw=1.5, label=label)
self.lines.append(line)
else:
#size of data
n = len(y)
for i in range(n):
#check if rolling window plot
if self.max_samples is None:
self.data.append({"x": [], "y": []})
else:
self.data.append({"x": deque(maxlen=self.max_samples),
"y": deque(maxlen=self.max_samples)})
#label selection and line (trace) initialization
label = self.labels[i] if i < len(self.labels) else f"port {i}"
line, = self.ax.plot([], [], lw=1.5, label=label)
self.lines.append(line)
# Create legend
self.legend = self.ax.legend(fancybox=False, ncols=int(np.ceil(n/4)), loc="lower left")
self._setup_legend_picking()
#add the data
if np.isscalar(y):
self.data[0]["x"].append(x)
self.data[0]["y"].append(y)
else:
for i, val in enumerate(y):
self.data[i]["x"].append(x)
self.data[i]["y"].append(val)
#check if new update of plot is required
current_time = time.time()
if current_time - self.last_update > self.update_interval:
self._update_plot()
self.last_update = current_time
return True
def _update_plot(self):
#set the data to the lines (traces) of the plot
for i, line in enumerate(self.lines):
line.set_data(self.data[i]["x"], self.data[i]["y"])
#rescale the window
self.ax.relim()
self.ax.autoscale_view()
#redraw the figure
self.fig.canvas.draw()
self.fig.canvas.flush_events()
[docs]
def show(self):
plt.show(block=False)
[docs]
def on_close(self, event):
self.is_running = False
def _setup_legend_picking(self):
#setup the picking for the legend lines
for legline, origline in zip(self.legend.get_lines(), self.lines):
legline.set_picker(5) # 5 points tolerance
self.lined[legline] = origline
def on_pick(event):
legline = event.artist
origline = self.lined[legline]
visible = not origline.get_visible()
origline.set_visible(visible)
legline.set_alpha(1.0 if visible else 0.2)
self.fig.canvas.draw()
self.fig.canvas.mpl_connect("pick_event", on_pick)