-
Notifications
You must be signed in to change notification settings - Fork 4
/
qemu.py
55 lines (40 loc) · 1.08 KB
/
qemu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import json
import fcntl
import socket
from pathlib import Path
class QEMU:
client = None
socket = None
def __init__(self, unixSocket):
self.socket = unixSocket
def __enter__(self):
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.client.settimeout(1)
self.client.setblocking(1)
self.client.connect(self.socket)
self.read(self.client)
self.qmp({ "execute": "qmp_capabilities" })
return self
def __exit__(self, _type, value, tb):
self.client.close()
def read(self, socket):
f = ''
obj = None
while True:
f += self.client.recv(1).decode()
try:
if len(f.strip()) > 0:
obj = json.loads(f.strip())
print('RECV <-', obj)
break
except:
pass
return obj
def qmp(self, command):
message = f"{json.dumps(command)}\n"
print(f"SEND -> {message.strip()}")
self.client.send(str.encode(message))
return self.read(self.client)
def hmp(self, command):
result = self.qmp({ "execute": "human-monitor-command", "arguments": { "command-line": f"{command}" } })
return result['return'] if 'return' in result else result