-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredpitaya_scpi.py
136 lines (107 loc) · 3.82 KB
/
redpitaya_scpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""SCPI access to Red Pitaya."""
import socket
__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"
class scpi (object):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter = '\r\n'
def __init__(self, host, timeout=None, port=5000):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self.host = host
self.port = port
self.timeout = timeout
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout is not None:
self._socket.settimeout(timeout)
self._socket.connect((host, port))
except socket.error as e:
print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))
def __del__(self):
if self._socket is not None:
self._socket.close()
self._socket = None
def close(self):
"""Close IP connection."""
self.__del__()
def rx_txt(self, chunksize = 4096):
"""Receive text string and return it after removing the delimiter."""
msg = ''
while 1:
chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
msg += chunk
if (len(chunk) and chunk[-2:] == self.delimiter):
break
return msg[:-2]
def rx_arb(self):
numOfBytes = 0
""" Recieve binary data from scpi server"""
str=''
while (len(str) != 1):
str = (self._socket.recv(1))
if not (str == '#'):
return False
str=''
while (len(str) != 1):
str = (self._socket.recv(1))
numOfNumBytes = int(str)
if not (numOfNumBytes > 0):
return False
str=''
while (len(str) != numOfNumBytes):
str += (self._socket.recv(1))
numOfBytes = int(str)
str=''
while (len(str) != numOfBytes):
str += (self._socket.recv(1))
return str
def tx_txt(self, msg):
"""Send text string ending and append delimiter."""
return self._socket.send((msg + self.delimiter).encode('utf-8'))
def txrx_txt(self, msg):
"""Send/receive text string."""
self.tx_txt(msg)
return self.rx_txt()
# IEEE Mandated Commands
def cls(self):
"""Clear Status Command"""
return self.tx_txt('*CLS')
def ese(self, value: int):
"""Standard Event Status Enable Command"""
return self.tx_txt('*ESE {}'.format(value))
def ese_q(self):
"""Standard Event Status Enable Query"""
return self.txrx_txt('*ESE?')
def esr_q(self):
"""Standard Event Status Register Query"""
return self.txrx_txt('*ESR?')
def idn_q(self):
"""Identification Query"""
return self.txrx_txt('*IDN?')
def opc(self):
"""Operation Complete Command"""
return self.tx_txt('*OPC')
def opc_q(self):
"""Operation Complete Query"""
return self.txrx_txt('*OPC?')
def rst(self):
"""Reset Command"""
return self.tx_txt('*RST')
def sre(self):
"""Service Request Enable Command"""
return self.tx_txt('*SRE')
def sre_q(self):
"""Service Request Enable Query"""
return self.txrx_txt('*SRE?')
def stb_q(self):
"""Read Status Byte Query"""
return self.txrx_txt('*STB?')
# :SYSTem
def err_c(self):
"""Error count."""
return rp.txrx_txt('SYST:ERR:COUN?')
def err_c(self):
"""Error next."""
return rp.txrx_txt('SYST:ERR:NEXT?')