This repository has been archived by the owner on Sep 17, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
HyperDeck.py
147 lines (124 loc) · 5.09 KB
/
HyperDeck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import socket
from time import sleep
class HyperDeck:
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(0.5)
class HyperDeckException(Exception):
def __init__(self, message):
self.message = message
# Connect to the HyperDeck
def connect(self):
try:
self.sock.connect((self.ip, self.port))
return self._run_command('')
except socket.gaierror:
raise self.HyperDeckException('Network problem connecting to HyperDeck')
def disconnect(self):
self.sock.close()
# Get useful live snapshot of what HyperDeck is doing
def get_transport_info(self):
return self._run_command('transport info')
# Stop playback / recording
def stop(self):
return self._run_command('stop')
# Play the current clip or playrannge, with an optional true/false loop
def play(self, loop=False):
if loop == None:
loop = False
if loop:
return self._run_command('play: loop: true')
else:
return self._run_command('play: loop: false')
# Choose the disk (usually 1 or 2)
def select_slot(self, slot_id):
return self._run_command('slot select: slot id: {}'.format(slot_id))
# Make the clip with the provided index number the current clip
def select_clip_with_index(self, clip_index):
clip_index = 1 + max(clip_index, 0)
return self._run_command('goto: clip id: {}'.format(clip_index))
# Set a play-range from timecode to timecode in format '00:00:00:00' ('HH:MM:SS:FF')
def set_playrange(self, from_timecode, to_timecode):
return self._run_command('playrange set: in: {} out: {}'.format(from_timecode, to_timecode))
# Get a list of clips and format them into a list
def get_clips_list(self):
formatted_clip_list = []
clip_list = self._run_command('clips get')
for clip_index in range(1, int(clip_list['clip count']) + 1):
this_clip = clip_list[str(clip_index)]
timecodes_section = this_clip[-23:]
timecodes = timecodes_section.split(' ')
formatted_clip = {'hyperDeckClipIndex': clip_index,
'name': this_clip[:-24],
'timecode': timecodes[0],
'duration': timecodes[1]}
formatted_clip_list.append(formatted_clip)
return formatted_clip_list
# Internal run command which controls sending a command to HyperDeck
# and getting a response
def _run_command(self, command):
obj_response = {}
try:
if len(command) > 0:
if self._send(command):
obj_response = self._objectify_response(self._receive())
else:
obj_response = self._objectify_response(self._receive())
except:
obj_response = {}
obj_response['error'] = True
obj_response['response_code'] = 999 # 'bad' error!!
return obj_response
# Sends a command to HyperDeck
def _send(self, msg):
msg += "\r\n"
try:
# return self.sock.send(bytes(msg, 'utf-8'))
self.sock.sendall(msg.encode('utf-8'))
return True
except Exception as e:
print("Error during send to HyperDeck:", e)
return False
# Receives text information back from HyperDeck
def _receive(self):
chunks = []
bytes_recd = 0
received_text = ''
end_of_input_flag = False
while not end_of_input_flag:
try:
chunk = self.sock.recv(1000)
if chunk == b'':
raise RuntimeError("socket connection broken")
elif chunk == None or len(chunk) == 0:
end_of_input_flag = True
else:
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
received_bytes = b''.join(chunks)
received_text += str(received_bytes)
except Exception as e:
end_of_input_flag = True
return received_text
# Convert the text response from HyperDeck into a more disciplined object
def _objectify_response(self, hyperdeck_response):
data = {}
lines = hyperdeck_response.split('\\r\\n')
for line in lines:
if line.startswith("b'"):
header_items = line.split(" ")
header_text = ""
for item in header_items:
if item == header_items[0]:
data["response_code"] = int(header_items[0][2:])
else:
header_text += item + " "
data['response_subject'] = header_text.strip()[:-1]
else:
name_value_pair = line.split(': ')
if len(name_value_pair) == 2:
data[name_value_pair[0]] = name_value_pair[1]
return data