-
Notifications
You must be signed in to change notification settings - Fork 0
/
getfilters.py
80 lines (56 loc) · 2.12 KB
/
getfilters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import socket
import re
class vatt_filters:
"""Class to retrieve filter data from the
redesigned guidebox at VATT."""
def __init__(self):
pass
def connect(self, ip:str="10.0.1.108", port:int=7623):
"""
Description: Use socket class to create a connection
to the INDI server that controls the guidebox
Args ip, port
Return: socket connection instance
"""
soc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
HOST = socket.gethostbyname(ip)
soc.settimeout(0.5)
soc.connect( ( HOST, int( port ) ) )
return soc
def getfilters(self):
"""
Description: Connects to the INDI server and sends
a getProperties xml tag. Then listen for a response.
the response should come in the form of an xml tag
like:
<message device="FILTERS" message="upper:clear lower:U">
A regular expresion is then used to extract the filters
in the beam in the upper and lower wheels.
Returns: A dictionary with the filters in the beam.
"""
conn = self.connect()
qrystring = b"<getProperties version='1.7' device='FILTERS' />"
conn.send(qrystring)
regex = re.compile("message=\"(\w*):(.*) (\w*):(.*)\"")
resp = ''
while True:
try:
resp+= conn.recv(100).decode()
except socket.timeout:
if ">" in resp[-4:]:
break
else:
raise socket.timeout(
"Could not get a response from {} with string {}"
.format(str(conn), qrystring.decode()))
if ">" in resp[-4:]:
break
match = regex.search(resp)
if match is not None:
tmatch = match.groups()
if len(tmatch) != 4:
raise ValueError( "Could not get filters from {}".format(resp))
fdict = {tmatch[0]:tmatch[1], tmatch[2]:tmatch[3]}
else:
raise ValueError( "Could not get filters from {}".format(resp))
return fdict