-
Notifications
You must be signed in to change notification settings - Fork 2
/
SignalBox.py
executable file
·168 lines (159 loc) · 6.2 KB
/
SignalBox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import time
import serial
import json
import matplotlib
import numpy as np
from matplotlib.lines import Line2D
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import random
class SignalBox():
'''Class for communication with a skript on an Arduino Uno, controlling two digipotis'''
def __init__(self,devicePath):
# timeout for waiting for the arduino's answer
self.timeout=3
# path to the port e.g. 'COM3'
self.devicePath=devicePath
# open the serial port
self.port=serial.Serial(devicePath,115200,timeout=1)
time.sleep(2)
def __del__(self):
self.port.close()
def __communicateTaskWithJSON(self,task):
'task is a dictionary containing keys "command" and (if command is write) "value"'
# convert dictionary 'task' to json string
string=json.dumps(task)
# send string to the serial port
self.port.write(string)
# wait until the arduino answers or until timeout
t=time.time()
while self.port.inWaiting()==0 and not time.time()-t > self.timeout:
pass
# wait until the arduino has finished answering (python is MUCH faster than arduino skripts)
time.sleep(0.1)
# convert the answer to a dictionary using json
try:
result=json.loads(self.port.read(self.port.inWaiting()))
except:
result=dict(value=0,error=1,errorMessage="received bad string")
return result
def reset(self):
self.closePort()
self.port=serial.Serial(self.devicePath,9600,timeout=1)
def closePort(self):
try:
self.port.close()
except: pass
def setPreGain(self,value):
task=dict(command="write1",value=value)
return self.__communicateTaskWithJSON(task)
def getPreGain(self):
task=dict(command="read1")
return self.__communicateTaskWithJSON(task)
def increasePreGain(self):
task=dict(command="inc1")
return self.__communicateTaskWithJSON(task)
def decreasePreGain(self):
task=dict(command="dec1")
return self.__communicateTaskWithJSON(task)
def setGain(self,value):
task=dict(command="write2",value=value)
return self.__communicateTaskWithJSON(task)
def getGain(self):
task=dict(command="read2")
return self.__communicateTaskWithJSON(task)
def increaseGain(self):
task=dict(command="inc2")
return self.__communicateTaskWithJSON(task)
def decreaseGain(self):
task=dict(command="dec2")
return self.__communicateTaskWithJSON(task)
def _data4oszi(self):
# generator function for oszi
data=[]
while True:
read=self.port.read(self.port.inWaiting())
# arduino sends data as integers separated by newlines
read=read.splitlines()
# the first and last entry could be incomplete
for line in read[1:-1]:
try:
data.append(int(line))
except:
pass
yield data
data=[]
def oszi(self,maxt,trigger=False):
''' "maxt" is the number of plottet datapoints '''
try:
self.port.write('{"command":"adcON"}')
fig = plt.figure()
ax = fig.add_subplot(111)
scope = Scope(ax, maxt)
# pass a generator in to produce data for the update func
ani = animation.FuncAnimation(fig, scope.update, self._data4oszi, fargs=[True], interval=0, blit=True)
plt.show()
except:
# if the plot application is killed, make sure the arduino is told
# to stop polling
self.port.write('{"command":"adcOFF"}')
time.sleep(2)
# empty the serial buffer (contains the arduino's json-answer)
self.port.read(self.port.inWaiting())
# Emulate an oscilloscope. Requires the animation API introduced in
# matplotlib 1.0 SVN.
class Scope:
def __init__(self, ax, maxt=5000):
self.ax = ax
self.maxt = maxt
self.tdata = [0]
self.ydata = [0]
self.line = Line2D(self.tdata, self.ydata)
self.ax.add_line(self.line)
self.ax.set_ylim(-.1, 1026)
self.ax.set_xlim(0, self.maxt)
# maximum number of data points used for triggering
self.trigzonedefault=int(self.maxt/2)
self.trigzone=self.trigzonedefault
self.triglevel=0
def trigger(self, data):
# works as a trigger for the oszilloscope.
# triglevel is chosen as the maximum of the previous data set.
self.triglevel=np.array(self.ydata).max()
# trigzone is the 'timeout' for triggering.
self.trigzone-=len(data)
if self.trigzone<0:
zone=len(data)+self.trigzone
else:
zone=len(data)
for i in xrange(zone):
# if the new signal reaches the triglevel within some tolerance (5)
# the plot is continued
if abs(self.triglevel-data[i])<5:
self.trigzone=self.trigzonedefault
return data[i:]
if self.trigzone<0:
# if no match has been achieved within trigzone, reset the variable
# and continue plotting
self.trigzone=self.trigzonedefault
return data[zone:]
# no match but trigzone has not been reached -> wait for new data
return None
def update(self, data, trigger=False):
# check if the end of the plotwindow has been reached
if len(self.tdata)+len(data)>self.maxt:
# fill remaining space
self.ydata+=data[:self.maxt-len(self.tdata)]
data=data[self.maxt-len(self.tdata):]
# trigger the rest of data
if trigger:
data=self.trigger(data)
# if trigger was successful, start new plot
if data:
self.ydata=data
else:
self.ydata+=data
self.tdata=range(len(self.ydata))
self.ax.figure.canvas.draw()
self.line.set_data(self.tdata, self.ydata)
return self.line,