diff --git a/Mower0.py b/Mower0.py index a7de230e4..a94ccf385 100644 --- a/Mower0.py +++ b/Mower0.py @@ -3,11 +3,13 @@ import ctypes import colorlog import cv2 +import hashlib +import hmac import inspect import json import os -import pystray import pathlib +import pystray import requests import smtplib import sys @@ -15,6 +17,7 @@ import time import warnings import yaml +from ctypes import CFUNCTYPE, c_int, c_char_p, c_void_p from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -23,6 +26,7 @@ from typing import Optional from PIL import Image from pystray import MenuItem, Menu +from urllib import parse from arknights_mower.data import agent_list from arknights_mower.utils import (character_recognize, config, detector, segment) from arknights_mower.utils import typealias as tp @@ -37,7 +41,6 @@ from arknights_mower.utils.scheduler_task import SchedulerTask from arknights_mower.utils.solver import BaseSolver from arknights_mower.utils.recognize import Recognizer, RecognizeError -from ctypes import CFUNCTYPE, c_int, c_char_p, c_void_p def warn(*args, **kwargs): @@ -72,7 +75,7 @@ def warn(*args, **kwargs): 窗口 = Tk() 窗口宽度 = 窗口.winfo_screenwidth() 窗口高度 = 窗口.winfo_screenheight() -字幕字号 = 窗口.winfo_screenheight() // 22 +字幕字号 = 窗口.winfo_screenheight() // 23 if 用户配置['字幕字号'] != '默认': 字幕字号 = int(用户配置['字幕字号']) 字幕颜色 = 用户配置['字幕颜色'] @@ -256,18 +259,72 @@ def 森空岛签到(): 'Connection': 'close' } + # 签名请求头一定要这个顺序,否则失败 + # timestamp是必填的,其它三个随便填,不要为none即可 + header_for_sign = { + 'platform': '', + 'timestamp': '', + 'dId': '', + 'vName': '' + } + + def generate_signature(token: str, path, body_or_query): + """ + 获得签名头 + 接口地址+方法为Get请求?用query否则用body+时间戳+ 请求头的四个重要参数(dId,platform,timestamp,vName).toJSON() + 将此字符串做HMAC加密,算法为SHA-256,密钥token为请求cred接口会返回的一个token值 + 再将加密后的字符串做MD5即得到sign + :param token: 拿cred时候的token + :param path: 请求路径(不包括网址) + :param body_or_query: 如果是GET,则是它的query。POST则为它的body + :return: 计算完毕的sign + """ + # 总是说请勿修改设备时间,怕不是yj你的服务器有问题吧,所以这里特地-2 + t = str(int(time.time()) - 2) + token = token.encode('utf-8') + header_ca = json.loads(json.dumps(header_for_sign)) + header_ca['timestamp'] = t + header_ca_str = json.dumps(header_ca, separators=(',', ':')) + s = path + body_or_query + t + header_ca_str + hex_s = hmac.new(token, s.encode('utf-8'), hashlib.sha256).hexdigest() + md5 = hashlib.md5(hex_s.encode('utf-8')).hexdigest().encode('utf-8').decode('utf-8') # 算出签名 + return md5, header_ca + + def get_sign_header(url: str, method, body, old_header): + h = json.loads(json.dumps(old_header)) + p = parse.urlparse(url) + if method.lower() == 'get': + h['sign'], header_ca = generate_signature(sign_token, p.path, p.query) + else: + h['sign'], header_ca = generate_signature(sign_token, p.path, json.dumps(body)) + for i in header_ca: + h[i] = header_ca[i] + return h + + def login_by_password(): + r = requests.post("https://as.hypergryph.com/user/auth/v1/token_by_phone_password", + json={"phone": 用户配置['手机号'], "password": 用户配置['密码']}, headers=header_login).json() + return get_token(r) + + def get_cred_by_token(token): + grant_code = get_grant_code(token) + return get_cred(grant_code) + def get_token(resp): - if resp['status'] != 0: + if resp.get('status') != 0: raise Exception(f'获得token失败:{resp["msg"]}') return resp['data']['token'] def get_grant_code(token): - resp = requests.post("https://as.hypergryph.com/user/oauth2/v2/grant", json={ + response = requests.post("https://as.hypergryph.com/user/oauth2/v2/grant", json={ 'appCode': '4ca99fa6b56cc2ba', 'token': token, 'type': 0 - }, headers=header_login).json() - if resp['status'] != 0: + }, headers=header_login) + resp = response.json() + if response.status_code != 200: + raise Exception(f'获得认证代码失败:{resp}') + if resp.get('status') != 0: raise Exception(f'获得认证代码失败:{resp["msg"]}') return resp['data']['code'] @@ -277,16 +334,19 @@ def get_cred(grant): 'kind': 1 }, headers=header_login).json() if resp['code'] != 0: - raise Exception(f'获得cred失败:{resp["messgae"]}') - return resp['data']['cred'] + raise Exception(f'获得cred失败:{resp["message"]}') + return resp['data'] def get_binding_list(): v = [] - resp = requests.get(url="https://zonai.skland.com/api/v1/game/player/binding", headers=header).json() + resp = requests.get("https://zonai.skland.com/api/v1/game/player/binding", + headers=get_sign_header("https://zonai.skland.com/api/v1/game/player/binding", + 'get', None, header)).json() + if resp['code'] != 0: - print(f"请求角色列表出现问题:{resp['message']}") + logger.warning(f"请求角色列表出现问题:{resp['message']}") if resp.get('message') == '用户未登录': - logger.warning(f'用户登录可能失效了,请重新运行签到程序!') + logger.warning(f'用户登录可能失效了,请重新运行此程序!') return [] for i in resp['data']['list']: if i.get('appCode') != 'arknights': @@ -295,36 +355,39 @@ def get_binding_list(): return v try: - header['cred'] = get_cred(get_grant_code(get_token(requests.post( - "https://as.hypergryph.com/user/auth/v1/token_by_phone_password", - json={"phone": 用户配置['手机号'], "password": 用户配置['密码']}, headers=header_login).json()))) + sign_token = get_cred_by_token(login_by_password())['token'] + header['cred'] = get_cred_by_token(login_by_password())['cred'] characters = get_binding_list() - for 角色 in characters: + + for i in characters: body = { - 'uid': 角色.get('uid'), - 'gameId': 角色.get("channelMasterId") + 'gameId': 1, + 'uid': i.get('uid') } - 响应 = requests.post( - "https://zonai.skland.com/api/v1/game/attendance", headers=header, json=body).json() - if 响应['code'] == 10001: - logger.info("森空岛今天已经签到") - 已签到日期 = datetime.now().strftime('%Y-%m-%d') - elif 响应['code'] == 0: - for 奖励 in 响应['data']['awards']: - 资源 = 奖励['resource'] - logger.warning( - f'{角色.get("nickName")}({角色.get("channelName")})签到成功,获得了{资源["name"]} × {奖励.get("count") or 1}' - ) + resp = requests.post("https://zonai.skland.com/api/v1/game/attendance", + headers=get_sign_header("https://zonai.skland.com/api/v1/game/attendance", + 'post', body, header), json=body).json() + + if resp['code'] == 0: + 已签到日期 = datetime.now().strftime('%Y年%m月%d日') + logger.warning(f'今天是{已签到日期},{i.get("nickName")}({i.get("channelName")})在森空岛签到成功!') + for j in resp['data']['awards']: + res = j['resource'] + logger.warning(f'获得了{res["name"]} × {j.get("count") or 1}') if 弹窗提醒开关: - 托盘图标.notify( - f'森空岛签到成功!\n本次签到获得了 {资源["name"]} × {奖励.get("count") or 1}', "森空岛签到") - 已签到日期 = datetime.now().strftime('%Y-%m-%d') + 托盘图标.notify(f'{i.get("nickName")}({i.get("channelName")})在森空岛签到成功!\n获得了{res["name"]} × {j.get("count") or 1}', "森空岛签到") + elif resp['code'] == 10001: + 已签到日期 = datetime.now().strftime('%Y年%m月%d日') + logger.info(f'今天是{已签到日期},{i.get("nickName")}({i.get("channelName")})今天在森空岛已经签过到了') else: - logger.warning( - f'{角色.get("nickName")}({角色.get("channelName")})签到失败了!原因:{响应.get("message")}') + logger.warning(f'{i.get("nickName")}({i.get("channelName")})签到失败了!原因:{resp.get("message")}') + if 弹窗提醒开关: + 托盘图标.notify( + f'{i.get("nickName")}({i.get("channelName")})签到失败了!原因:{resp.get("message")}', "森空岛签到") continue except Exception as ex: - logger.warning(f'签到失败,原因:{str(ex)}') + logger.warning(f'森空岛签到失败,原因:{str(ex)}') + logger.error('', exc_info=ex) class 项目经理(BaseSolver): @@ -343,15 +406,14 @@ def __init__(self, device: 设备控制 = None, recog: Recognizer = None) -> Non self.任务列表 = [] self.run_order_rooms = {} - def 返回基主界面(self): + def 返回基建主界面(self): logger.info('返回基建主界面') + 返回计数 = 0 while self.get_infra_scene() != 201: - if self.find('index_infrastructure') is not None: - self.tap_element('index_infrastructure') - elif self.find('12cadpa') is not None: - self.device.tap((self.recog.w // 2, self.recog.h // 2)) - else: - self.back() + self.back() + 返回计数 += 1 + if 返回计数 > 10: + self.处理报错() self.recog.update() def run(self) -> None: @@ -359,7 +421,7 @@ def run(self) -> None: if len(self.任务列表) == 0: self.recog.update() time.sleep(1) - self.handle_error(True) + self.处理报错(True) if len(self.任务列表) > 0: # 找到时间最近的一次单个任务 self.任务 = self.任务列表[0] @@ -407,23 +469,18 @@ def find_next_task(self, compare_time=None, task_type='', compare_type='<'): return next((e for e in self.任务列表 if (True if compare_time is None else e.time < compare_time) and ( True if task_type == '' else task_type in e.type)), None) - def handle_error(self, force=False): + def 处理报错(self, force=False): + 报错计时 = datetime.now() + 循环次数 = 0 while self.scene() == -1: self.recog.update() - logger.info('返回基建主界面') - unknown_count = 0 - while self.get_infra_scene() != 201 and unknown_count < 5: - logger.warning(f'未知界面{unknown_count}') - if self.find('index_infrastructure') is not None: - self.tap_element('index_infrastructure') - elif self.find('12cadpa') is not None: - self.device.tap((self.recog.w // 2, self.recog.h // 2)) - else: - self.back() - self.recog.update() - time.sleep(1) - unknown_count += 1 - self.device.exit(self.服务器) + logger.info('处理报错') + self.返回基建主界面() + 循环次数 += 1 + if (datetime.now() - 报错计时).total_seconds() > self.跑单提前运行时间 // 3: + logger.info(f'报错次数达{循环次数},时间长达{(datetime.now() - 报错计时).total_seconds()}') + 循环次数 = 0 + 重新运行Mower0() if self.error or force: # 如果没有任何时间小于当前时间的任务才生成空任务 if self.find_next_task(datetime.now()) is None: @@ -479,7 +536,7 @@ def infra_main(self): self.tap(notification) self.collect_notification = True else: - return self.handle_error() + return self.处理报错() def plan_solver(self): plan = self.plan @@ -531,7 +588,7 @@ def 读取接单时间(self, room): logger.info(f'贸易站 B{room[5]}0{room[7]} 接单时间为 {execute_time.strftime("%H:%M:%S")}') execute_time = execute_time - timedelta(seconds=(self.跑单提前运行时间)) self.recog.update() - self.返回基主界面() + self.返回基建主界面() return execute_time def double_read_time(self, cord, upperLimit=None, use_digit_reader=False): @@ -547,7 +604,12 @@ def double_read_time(self, cord, upperLimit=None, use_digit_reader=False): def initialize_paddle(self): global ocr if ocr is None: - ocr = PaddleOCR(enable_mkldnn=False, use_angle_cls=False, cls=False, show_log=False) + # mac 平台不支持 mkldnn 加速,关闭以修复 mac 运行时错误 + if sys.platform == 'darwin': + ocr = PaddleOCR(enable_mkldnn=False, use_angle_cls=False, cls=False, show_log=False) + else: + ocr = PaddleOCR(enable_mkldnn=True, use_angle_cls=False, cls=False, show_log=False) + # ocr = PaddleOCR(enable_mkldnn=False, use_angle_cls=False, cls=False, show_log=False) def read_screen(self, img, type="mood", limit=24, cord=None): if cord is not None: @@ -680,7 +742,7 @@ def 无人机加速(self, room: str, not_customize=False, not_return=False): while self.find('factory_accelerate') is None and self.find('bill_accelerate') is None: if error_count > 5: raise Exception('未成功进入无人机界面') - self.tap((self.recog.w // 20, self.recog.h * 19 //20), interval=3) + self.tap((self.recog.w // 20, self.recog.h * 19 // 20), interval=3) error_count += 1 accelerate = self.find('bill_accelerate') if accelerate: @@ -813,8 +875,10 @@ def 换上干员(self, agents: list[str], room: str) -> None: if not 换上干员名单 == '': 换上干员名单 += '、' 换上干员名单 += 干员名 if room.startswith('room') and ('但书' in agent or '龙舌兰' in agent): - logger.info(f'{换上干员名单} 进驻房间 B{room[5]}0{room[7]} 时间为 {(self.任务列表[0].time + timedelta(seconds=(self.跑单提前运行时间 - self.更换干员前缓冲时间))).strftime("%H:%M:%S")}') - else: logger.info(f'换上 {换上干员名单}') + logger.info( + f'{换上干员名单} 进驻房间 B{room[5]}0{room[7]} 时间为 {(self.任务列表[0].time + timedelta(seconds=(self.跑单提前运行时间 - self.更换干员前缓冲时间))).strftime("%H:%M:%S")}') + else: + logger.info(f'换上 {换上干员名单}') h, w = self.recog.h, self.recog.w first_time = True right_swipe = 0 @@ -1008,7 +1072,7 @@ def 跑单(self, plan: tp.BasePlan, get_time=False): self.recog.update() if room.startswith('room'): 龙舌兰_但书进驻前的等待时间 = round(((self.任务列表[0].time - datetime.now()).total_seconds() + - self.跑单提前运行时间 - self.更换干员前缓冲时间), 1) + self.跑单提前运行时间 - self.更换干员前缓冲时间), 1) if 龙舌兰_但书进驻前的等待时间 > 0: logger.info(f'龙舌兰、但书进驻前等待 {str(龙舌兰_但书进驻前的等待时间)} 秒') time.sleep(龙舌兰_但书进驻前的等待时间) @@ -1038,11 +1102,12 @@ def 跑单(self, plan: tp.BasePlan, get_time=False): error_count += 1 修正后的接单时间 = self.double_read_time( (self.recog.w * 650 // 2496, self.recog.h * 660 // 1404, - self.recog.w * 815 // 2496, self.recog.h * 710 //1404), + self.recog.w * 815 // 2496, self.recog.h * 710 // 1404), use_digit_reader=True) 截图等待时间 = round((修正后的接单时间 - datetime.now()).total_seconds(), 1) if (截图等待时间 > 0) and (截图等待时间 < 1000): - logger.info(f'房间 B{room[5]}0{room[7]} 修正后的接单时间为 {修正后的接单时间.strftime("%H:%M:%S")}') + logger.info( + f'房间 B{room[5]}0{room[7]} 修正后的接单时间为 {修正后的接单时间.strftime("%H:%M:%S")}') logger.info(f'等待截图时间为 {str(截图等待时间)} 秒') time.sleep(截图等待时间) logger.info('保存截图') @@ -1074,7 +1139,7 @@ def 跑单(self, plan: tp.BasePlan, get_time=False): new_plan[run_order_room] = [data["agent"] for data in self.plan[room]] # 返回基建主界面 self.recog.update() - self.返回基主界面() + self.返回基建主界面() self.任务列表.append(SchedulerTask(time=self.任务列表[0].time, task_plan=new_plan)) if 龙舌兰和但书休息: 宿舍 = {} @@ -1092,7 +1157,6 @@ def skip(self, task_names='All'): if 'collect_notification': self.collect_notification = True - @CFUNCTYPE(None, c_int, c_char_p, c_void_p) def log_maa(msg, details, arg): m = Message(msg) @@ -1101,7 +1165,6 @@ def log_maa(msg, details, arg): logger.debug(m) logger.debug(arg) - def MAA初始化(self): asst_path = os.path.dirname(pathlib.Path(self.MAA设置['MAA路径']) / "Python" / "asst") if asst_path not in sys.path: @@ -1120,8 +1183,7 @@ def MAA初始化(self): logger.info("MAA 连接失败") raise Exception("MAA 连接失败") - - def append_maa_task(self, type): + def 添加MAA任务(self, type): if type in ['StartUp', 'Visit', 'Award']: self.MAA.append_task(type) elif type == 'Fight': @@ -1171,7 +1233,7 @@ def maa_plan_solver(self, 任务列表=['Fight'], one_time=False): if 任务列表 == 'All': 任务列表 = ['StartUp', 'Fight', 'Recruit', 'Visit', 'Mall', 'Award'] for maa_task in 任务列表: - self.append_maa_task(maa_task) + self.添加MAA任务(maa_task) # asst.append_task('Copilot', { # 'stage_name': '千层蛋糕', # 'filename': './GA-EX8-raid.json', @@ -1356,7 +1418,7 @@ def 初始化(任务列表, scheduler=None): else: scheduler.device = cli.device scheduler.recog = cli.recog - scheduler.handle_error(True) + scheduler.处理报错(True) return scheduler @@ -1394,7 +1456,7 @@ def Mower0(self): 当前项目.无人机加速(任务列表[0].type, True, True) 下个任务开始时间 = 任务列表[0].time 当前项目.recog.update() - 当前项目.返回基主界面() + 当前项目.返回基建主界面() 任务间隔 = (当前项目.任务列表[0].time - datetime.now()).total_seconds() if 任务间隔 > 0: 当前项目.send_email() @@ -1402,13 +1464,14 @@ def Mower0(self): for i in range(len(任务列表)): logger.warning( f'房间 B{任务列表[i].type[5]}0{任务列表[i].type[7]} 开始跑单的时间为 {任务列表[i].time.strftime("%H:%M:%S")}') - 无人机数量 = 当前项目.digit_reader.get_drone(当前项目.recog.gray, 当前项目.recog.h, 当前项目.recog.w) + 无人机数量 = 当前项目.digit_reader.get_drone(当前项目.recog.gray, 当前项目.recog.h, + 当前项目.recog.w) if 无人机数量 > 168: logger.warning(f'现在有 {无人机数量} 个无人机,请尽快使用,避免溢出!') 任务提示 += f'现在有 {无人机数量} 个无人机,请尽快使用!\n' for i in range(len(任务列表)): 任务提示 += f'房间 B{任务列表[i].type[5]}0{任务列表[i].type[7]} 开始跑单的时间为 {任务列表[i].time.strftime("%H:%M:%S")}\n' - if 弹窗提醒开关: 托盘图标.notify(任务提示, "Mower跑单提醒") + if 弹窗提醒开关: 托盘图标.notify(任务提示, "Mower0跑单提醒") # 如果有高强度重复MAA任务,任务间隔超过10分钟则启动MAA if MAA设置['作战开关'] == '开' and (任务间隔 > 600): @@ -1416,13 +1479,14 @@ def Mower0(self): elif 任务间隔 > 0: if 用户配置['任务结束后退出游戏'] == '是' and 任务间隔 > 跑单提前运行时间: 当前项目.device.exit(当前项目.服务器) - if 用户配置['森空岛签到开关'] == '开' and 已签到日期 != datetime.now().strftime('%Y-%m-%d'): + if 用户配置['森空岛签到开关'] == '开' and 已签到日期 != datetime.now().strftime('%Y年%m月%d日'): 森空岛签到() time.sleep(任务间隔 - 跑单提前运行时间) if 弹窗提醒开关: - 托盘图标.notify("跑单时间快到了喔,请放下游戏中正在做的事,或者手动关闭Mower", "Mower跑单提醒") + 托盘图标.notify("跑单时间快到了喔,请放下游戏中正在做的事,或者手动关闭Mower0", + "Mower0跑单提醒") time.sleep(跑单提前运行时间) - if 弹窗提醒开关: 托盘图标.notify("开始跑单!", "Mower跑单提醒") + if 弹窗提醒开关: 托盘图标.notify("开始跑单!", "Mower0跑单提醒") else: time.sleep(任务间隔) 当前项目.back_to_index() @@ -1497,39 +1561,39 @@ def 缩放字幕(event): def 跑单任务查询(icon: pystray.Icon): - icon.notify(任务提示, "Mower跑单任务列表") + icon.notify(任务提示, "Mower0跑单任务列表") -def 重新运行Mower(): - global Mower +def 重新运行Mower0(): + global Mower0 try: - Mower._stop_event.set() - 终止线程报错(Mower.ident, SystemExit) - logger.warning('Mower已停止,准备重新运行') + Mower0._stop_event.set() + 终止线程报错(Mower0.ident, SystemExit) + logger.warning('Mower0已停止,准备重新运行') except: pass - Mower = 线程() - Mower.start() + Mower0 = 线程() + Mower0.start() -def 停止运行Mower(): - Mower._stop_event.set() - 终止线程报错(Mower.ident, SystemExit) - logger.warning('Mower已停止') +def 停止运行Mower0(): + Mower0._stop_event.set() + 终止线程报错(Mower0.ident, SystemExit) + logger.warning('Mower0已停止') def 退出程序(): pid = os.getpid() # 获取当前进程ID - logger.error('退出 Mower') + logger.error('退出 Mower0') os.system('taskkill -f -pid %s' % pid) def 更新字幕(): global 字幕 任务倒计时 = (下个任务开始时间 - datetime.now()).total_seconds() - 字幕 = 'Mower的回合!' + 字幕 = 'Mower0的回合!' if 任务倒计时 > 0: - 字幕 = f'Mower将在{int(任务倒计时/60)}分钟后开始跑单' + 字幕 = f'Mower0将在{int(任务倒计时 / 60)}分钟后开始跑单' if 任务倒计时 <= 跑单提前运行时间: 字幕 += '\n跑单即将开始!' label.config(text=字幕, font=(用户配置['字幕字体'], 字幕字号, 'bold'), bg=字幕颜色, @@ -1537,13 +1601,12 @@ def 更新字幕(): 窗口.after(100, 更新字幕) -托盘菜单 = (MenuItem('森空岛签到', 森空岛签到, visible=True), - MenuItem(任务提示, 跑单任务查询, default=True, visible=False), +托盘菜单 = (MenuItem(任务提示, 跑单任务查询, default=True, visible=False), MenuItem('显示字幕', 显示字幕, visible=悬浮字幕开关), - MenuItem('重新运行Mower', 重新运行Mower, visible=True), - MenuItem('停止运行Mower', 停止运行Mower, visible=True), + MenuItem('重新运行Mower0', 重新运行Mower0, visible=True), + MenuItem('停止运行Mower0', 停止运行Mower0, visible=True), Menu.SEPARATOR, MenuItem('退出', 退出程序)) -托盘图标 = pystray.Icon("Mower 纯跑单", Image.open("logo.png"), "Mower 纯跑单", 托盘菜单) +托盘图标 = pystray.Icon("Mower0 纯跑单", Image.open("logo.png"), "Mower0 纯跑单", 托盘菜单) if 悬浮字幕开关: 窗口.geometry("%dx%d+%d+%d" % (窗口宽度, 窗口高度, (窗口.winfo_screenwidth() - 窗口宽度) / 2, @@ -1563,8 +1626,8 @@ def 更新字幕(): if __name__ == "__main__": threading.Thread(target=托盘图标.run, daemon=False).start() - Mower = 线程() - Mower.start() + Mower0 = 线程() + Mower0.start() if 悬浮字幕开关: 窗口.after(100, 更新字幕) 窗口.mainloop()