-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathplotter.py
More file actions
355 lines (313 loc) · 13.2 KB
/
plotter.py
File metadata and controls
355 lines (313 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""
This module contains a plot class with the capability to plot the streaming data
Author: Justin Duan
Date: 2018-05-22
"""
from multiprocessing import Process, Queue
from matplotlib.ticker import EngFormatter
import matplotlib.pyplot as plt
import numpy as np
import math
import time
import os
class Plotter(object):
"""
This class starts a plot engine that waits the data sent to the queue and
plot them out indefinitely
============================================================================
Attributes:
self.plot: grab the data from the queue and sent them to plot at a
specified time interval
"""
TOGGLE_SAVE = "NO SAVE"
STOP_FLAG = "STOP"
NEW_LINE_FLAG = "NEW LINE"
def __init__(self):
# A multiprocess safe queue used to accept the plotting data from
# the main process
self._queue = Queue()
def plot(self, axesNames=[('x2', 'y1'), ('x2', 'y5'),
('x2', 'y2'), ('x2', 'y3')],
labelNames = ['x1', 'x1', 'x1', 'x1'],
axisRange=(-1e-9, 1e-9, -1e-9, 1e-9),
engineeringFormat=True, figSize=(10,6), ncols=None,
markersize=5, plotLabelFormat=".0f", updateInterval=0.1,
saveFigPath=None, keepFig=False, saveFlag=True):
"""
Wait the data sent to queue and send them to plot at a certain time
interval
------------------------------------------------------------------------
Arguments:
axesNames: a list of tuples. A list of axes names for all subplots
labelNames: a list of strings.
A list of variables used to label each subplot
axisRange: tuple, default=(-1e-9, 1e-9, -1e-9, 1e-9)
The default ranges for x- and y- axes
engineeringFormat: bool, default=true
Whether or not the engineering format should be used for the
axes lables
figSize: tuple (width, height). Figure size
ncols: int, default=None.
number of columns of the plots. If None, this parameters will
be automatically determined
markersize: int, default=5. Plot marker size
plotLabelFormat: string.
A format string used to control the plot label on display
updateInterval: float, default=0.1.
This parameter determines how often the figure shold be updated
saveFigPath: path string, default=None.
The path to save the figure. If None, then no figure will be
saved
keepFig: bool, defualt=False.
Determine whether or not the figure window should be open or not
saveFlag: bool, default=True.
This parameter determine whether or not the figure should be
saved
"""
# Miscellaneous controls
self._axesNames = axesNames
self._labelNames = labelNames
if self._labelNames is None:
self._labelNames = [None] * len(self._axesNames)
self._style = self._styleGen()
self._markersize = markersize
self._plotLabelFormat = plotLabelFormat
self._saveFlag = saveFlag
# Create plots
if ncols is None:
ncols = 1
while ncols * ncols < len(axesNames):
ncols += 1
nrows = nrows=len(axesNames) // ncols
if ncols * nrows == len(axesNames):
self._fig, self._axes = plt.subplots(nrows, ncols)
else:
self._fig, self._axes = plt.subplots(nrows + 1, ncols)
if hasattr(self._axes, '__iter__'):
self._axes = self._axes.flatten()
else:
self._axes = [self._axes]
# Set figure size
self._fig.set_size_inches(*figSize)
# Label the axes
for ax, (xAxis, yAxis) in zip(self._axes, self._axesNames):
ax.set_xlabel(xAxis)
ax.set_ylabel(yAxis)
ax.set_title(yAxis + ' vs. ' + xAxis, fontsize=14)
ax.grid()
ax.set_xlim(axisRange[0], axisRange[1])
ax.set_ylim(axisRange[2], axisRange[3])
if engineeringFormat:
formatter = EngFormatter()
ax.xaxis.set_major_formatter(formatter)
ax.yaxis.set_major_formatter(formatter)
plt.tight_layout()
self._initialPlotting = True
# Show the figure
plt.show(block=False)
plt.draw()
# Change window title
if saveFigPath is not None:
folderPath, title = os.path.split(saveFigPath)
if not os.path.exists(folderPath):
os.makedirs(folderPath)
self._fig.canvas.set_window_title(title)
# Keep plotting the queue
self._plotQueue(updateInterval)
if saveFigPath is not None and self._saveFlag:
folderPath, title = os.path.split(saveFigPath)
if os.path.exists(folderPath):
plt.savefig(fname=saveFigPath)
if keepFig:
plt.show(block=True)
def _styleGen(self):
"""
A infinite generator that produces various plot styles
"""
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
markers = [".", "o", "v", "^", "<", ">", "1", "2", "3",
"4", "8", "s", "p", "P", "*", "h", "H", "+",
"x", "X", "D", "d", "|", "_"]
i, j = 0, 0
while True:
i %= len(colors)
j %= len(markers)
yield colors[i] + markers[j]
i += 1
j += 1
def _newLine(self, ax, style, label):
"""
Add a new line to the specific axes. Note the old lines are still
preserved
"""
if label is not None:
line, = ax.plot([], [], style, markersize=self._markersize, label=label)
ax.legend(loc='best')
else:
line, = ax.plot([], [], style, markersize=self._markersize)
return line
def _updateSingleAxis(self, ax, x, y):
"""
Add new data points to the current line of the specific axis
"""
# Define extra margin and scale extender
extra_margin = 1e-9
scale_ext = 1.2
# Set data
line = ax.get_lines()[-1]
x_arr, y_arr = line.get_data()
x_arr = np.append(x_arr, x)
y_arr = np.append(y_arr, y)
line.set_data(x_arr, y_arr)
# Adjust plot ranges
xmin, xmax = ax.get_xlim()
ymin, ymax = ax.get_ylim()
max_x = max(x) if hasattr(x, '__iter__') else x
min_x = min(x) if hasattr(x, '__iter__') else x
max_y = max(y) if hasattr(y, '__iter__') else y
min_y = min(y) if hasattr(y, '__iter__') else y
# Automatically adjust the plotting range at the beginning of plotting
if self._initialPlotting:
ax.set_xlim(min_x - extra_margin, max_x + extra_margin)
ax.set_ylim(min_y - extra_margin, max_y + extra_margin)
return
if max_x > xmax:
xmax = max(xmax, (max_x + min_x) / 2 + (max_x - min_x) / 2 * scale_ext)
ax.set_xlim(xmin, xmax)
if min_x < xmin:
xmin = min(xmin, (max_x + min_x) / 2 - (max_x - min_x) / 2 * scale_ext)
ax.set_xlim(xmin, xmax)
if max_y > ymax:
ymax = max(ymax, (max_y + min_y) / 2 + (max_y - min_y) / 2 * scale_ext)
ax.set_ylim(ymin, ymax)
if min_y < ymin:
ymin = min(ymin, (max_y + min_y) / 2 - (max_y - min_y) / 2 * scale_ext)
ax.set_ylim(ymin, ymax)
def _plotPoint(self, data, new_line=False):
"""
Add one data point to all charts in the smart way
Arguments --
data: a dictionary (keys can be lists). Used to smartly
assign the data to corresponding plots
new_line: create a new line or not
"""
# Plot each axis
if new_line:
style = next(self._style)
for i, (ax, (xAxis, yAxis), labelName) in enumerate(zip(
self._axes, self._axesNames, self._labelNames)):
if new_line:
if labelName is None:
label = None
else:
if not isinstance(labelName, list) and not isinstance(labelName, tuple):
labelVal = self._getLabelVal(data[labelName])
label = labelName + '=' + labelVal
else:
labelVal = [self._getLabelVal(data[x]) for x in labelName]
label = [x + '=' + y for x, y in zip(labelName, labelVal)]
label = ", ".join(label)
self._newLine(ax, style, label)
self._updateSingleAxis(ax, data[xAxis], data[yAxis])
plt.draw()
self._fig.canvas.flush_events()
self._initialPlotting = False
def _getLabelVal(self, labelVal):
if not hasattr(labelVal, '__iter__'):
return labelVal
if len(labelVal) == 1:
labelVal = labelVal[0]
else:
labelValMax, labelValMin = max(labelVal), min(labelVal)
if labelValMax == labelValMin:
labelVal = labelValMin
else:
if len(labelVal) > 5:
labelVal = "{}~{}".format(labelValMin, labelMax)
return format(labelVal, self._plotLabelFormat)
def _plotQueue(self, updateInterval):
"""
Keep on plotting
"""
continuePlotting = True
while continuePlotting:
self._buf = dict()
while self._queue.qsize() > 0:
data = self._queue.get()
if data == self.STOP_FLAG:
continuePlotting = False
continue
if data == self.TOGGLE_SAVE:
self._saveFlag = not self._saveFlag
continue
# In case a new line is requested, plot the buffer first
if self.NEW_LINE_FLAG in data:
self._plotBuffer()
self._plotPoint(data, new_line=True)
continue
self._fillBuffer(data)
self._plotBuffer()
time.sleep(updateInterval)
def _fillBuffer(self, data):
if len(self._buf) == 0:
self._createBufferEntries(data)
for key, val in data.items():
self._buf[key] = np.append(self._buf[key], val)
def _createBufferEntries(self, data):
if len(self._buf) > 0:
return
for key in data:
self._buf[key] = np.array([])
def _plotBuffer(self):
if len(self._buf) > 0:
self._plotPoint(self._buf, new_line=False)
self._buf = dict()
else:
# Flush GUI event in case there's nothing to do
self._fig.canvas.flush_events()
def addPoints(self, data):
"""
Add data points to the queue
"""
self._queue.put(data)
@classmethod
def selfTest(cls, pause=0.1):
"""
Perform the self-test using another process
"""
def _dummyDataGen(x1, counts=100):
"""
Dummy data generator used for self test
The return value is in the form of a dictionary
"""
while counts > 0:
x2 = counts * 36
yield {
'x1': x1,
'x2': x2,
'y1': math.sin(x2 / 360 * math.pi) + x1 / 1000,
'y2': math.cos(x2 / 360 * math.pi) + x1 / 1000,
'y3': math.radians(x2 / 360 * math.pi) + x1 / 2000,
}
counts -= 1
p = Plotter()
proc = Process( target=p.plot,
kwargs={'ncols': 2,
'axesNames': [('x2', 'y1'), ('x2', 'y2'), ('x2', 'y3')],
'labelNames': ['x1', 'x1', 'x1'],
'updateInterval': 0.1,
'saveFigPath': None,
'keepFig': False})
proc.start()
x1_list = [0, 1000]
for x1 in x1_list:
dummy_gen = _dummyDataGen(x1, 40)
for i, x in enumerate(dummy_gen):
if i == 0:
x[p.NEW_LINE_FLAG] = True
p.addPoints(x)
time.sleep(pause)
p.addPoints(p.STOP_FLAG)
print("Seft test is done!")
if __name__ == '__main__':
Plotter.selfTest()