forked from wxtcstt/MiChatGUI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
minaservice.py
106 lines (92 loc) · 3.45 KB
/
minaservice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json
from miaccount import MiAccount, get_random
import logging
_LOGGER = logging.getLogger(__package__)
class MiNAService:
def __init__(self, account: MiAccount):
self.account = account
async def mina_request(self, uri, data=None):
requestId = "app_ios_" + get_random(30)
if data is not None:
data["requestId"] = requestId
else:
uri += "&requestId=" + requestId
headers = {
"User-Agent": "MiHome/6.0.103 (com.xiaomi.mihome; build:6.0.103.1; iOS 14.4.0) Alamofire/6.0.103 MICO/iOSApp/appStore/6.0.103"
}
return await self.account.mi_request(
"micoapi", "https://api2.mina.mi.com" + uri, data, headers
)
async def device_list(self, master=0):
result = await self.mina_request("/admin/v2/device_list?master=" + str(master))
return result.get("data") if result else None
async def ubus_request(self, deviceId, method, path, message):
message = json.dumps(message)
result = await self.mina_request(
"/remote/ubus",
{"deviceId": deviceId, "message": message, "method": method, "path": path},
)
return result
async def text_to_speech(self, deviceId, text):
return await self.ubus_request(
deviceId, "text_to_speech", "mibrain", {"text": text}
)
async def player_set_volume(self, deviceId, volume):
return await self.ubus_request(
deviceId,
"player_set_volume",
"mediaplayer",
{"volume": volume, "media": "app_ios"},
)
async def player_pause(self, deviceId):
return await self.ubus_request(
deviceId,
"player_play_operation",
"mediaplayer",
{"action": "pause", "media": "app_ios"},
)
async def player_play(self, deviceId):
return await self.ubus_request(
deviceId,
"player_play_operation",
"mediaplayer",
{"action": "play", "media": "app_ios"},
)
async def player_get_status(self, deviceId):
return await self.ubus_request(
deviceId,
"player_get_play_status",
"mediaplayer",
{"media": "app_ios"},
)
async def play_by_url(self, deviceId, url):
return await self.ubus_request(
deviceId,
"player_play_url",
"mediaplayer",
{"url": url, "type": 1, "media": "app_ios"},
)
async def send_message(self, devices, devno, message, volume=None): # -1/0/1...
result = False
for i in range(0, len(devices)):
if (
devno == -1
or devno != i + 1
or devices[i]["capabilities"].get("yunduantts")
):
_LOGGER.debug(
"Send to devno=%d index=%d: %s", devno, i, message or volume
)
deviceId = devices[i]["deviceID"]
result = (
True
if volume is None
else await self.player_set_volume(deviceId, volume)
)
if result and message:
result = await self.text_to_speech(deviceId, message)
if not result:
_LOGGER.error("Send failed: %s", message or volume)
if devno != -1 or not result:
break
return result