diff --git a/arknights_mower/command.py b/arknights_mower/command.py index f45f5a6f6..1c879cb9f 100644 --- a/arknights_mower/command.py +++ b/arknights_mower/command.py @@ -26,7 +26,7 @@ def base(args: list[str] = [], device: Device = None): -f 是否使用菲亚梅塔恢复特定房间干员心情,恢复后恢复原位且工作位置不变,F、N 含义同上 """ from .data import base_room_list, agent_list - + arrange = None clue_collect = False drone_room = None @@ -54,10 +54,10 @@ def base(args: list[str] = [], device: Device = None): any_room.append(p) agents.append([]) elif p in agent_list or 'free' == p.lower(): - agents[-1].append(p) + agents[-1].append(p) except Exception: raise ParamError - + if arrange is None and any_room is not None and len(agents) > 0: arrange = dict(zip(any_room, agents)) @@ -84,17 +84,20 @@ def shop(args: list[str] = [], device: Device = None): ShopSolver(device).run(args) -def recruit(args: list[str] = [], email_config={}, maa_config={},device: Device = None): +def recruit(args: list[str] = [], email_config={}, maa_config={}, device: Device = None): """ recruit [agents ...] 自动进行公共招募 agents 优先考虑的公招干员,若不指定则使用配置文件中的优先级,默认为高稀有度优先 """ + choose = {} + result = {} if len(args) == 0: - RecruitSolver(device).run(config.RECRUIT_PRIORITY,email_config,maa_config) + choose, result = RecruitSolver(device).run(config.RECRUIT_PRIORITY, email_config, maa_config) else: - RecruitSolver(device).run(args,email_config) + choose, result = RecruitSolver(device).run(args, email_config) + return choose, result def mission(args: list[str] = [], device: Device = None): """ diff --git a/arknights_mower/solvers/base_schedule.py b/arknights_mower/solvers/base_schedule.py index 6a1ddc248..8dce2c3c7 100644 --- a/arknights_mower/solvers/base_schedule.py +++ b/arknights_mower/solvers/base_schedule.py @@ -10,6 +10,7 @@ from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart +from .skland import SKLand from ..command import recruit from ..data import agent_list from ..utils import character_recognize, detector, segment @@ -199,7 +200,7 @@ def overtake_room(self): # 修改执行时间 self.tasks[task_index].time = datetime.now() # 执行完提前换班任务再次执行本任务 - self.tasks.append(SchedulerTask(task_plan=copy.deepcopy(self.task.plan),task_type=self.task.type)) + self.tasks.append(SchedulerTask(task_plan=copy.deepcopy(self.task.plan), task_type=self.task.type)) else: # 任务全清 rooms = [] @@ -230,7 +231,7 @@ def overtake_room(self): if len(plan.keys()) > 0: self.tasks.append(SchedulerTask(task_plan=plan)) # 执行完提前换班任务再次执行本任务 - self.tasks.append(SchedulerTask(task_plan=copy.deepcopy(self.task.plan),task_type=self.task.type)) + self.tasks.append(SchedulerTask(task_plan=copy.deepcopy(self.task.plan), task_type=self.task.type)) self.skip() return @@ -814,7 +815,8 @@ def initialize_operators(self): return self.op_data.init_and_validate() def check_fia(self): - if '菲亚梅塔' in self.op_data.operators.keys() and self.op_data.operators['菲亚梅塔'].room.startswith('dormitory'): + if '菲亚梅塔' in self.op_data.operators.keys() and self.op_data.operators['菲亚梅塔'].room.startswith( + 'dormitory'): return self.op_data.operators['菲亚梅塔'].replacement, self.op_data.operators['菲亚梅塔'].room return None, None @@ -1973,6 +1975,10 @@ def maa_plan_solver(self, tasks='All', one_time=False): seconds=self.maa_config['maa_execution_gap'] * 3600) < self.maa_config['last_execution']: logger.info("间隔未超过设定时间,不启动maa") else: + """森空岛签到""" + skland = SKLand() + skland.attendance() + """测试公招用""" if 'Recruit' in tasks or tasks == 'All': recruit([], self.email_config, self.maa_config) diff --git a/arknights_mower/solvers/recruit.py b/arknights_mower/solvers/recruit.py index 0aae7a653..6c984d9f4 100644 --- a/arknights_mower/solvers/recruit.py +++ b/arknights_mower/solvers/recruit.py @@ -1,6 +1,7 @@ from __future__ import annotations from itertools import combinations +from typing import Tuple, Dict, Any from ..data import recruit_agent, recruit_tag, recruit_agent_list from ..ocr import ocr_rectify, ocrhandle @@ -47,7 +48,7 @@ def __init__(self, device: Device = None, recog: Recognizer = None) -> None: self.recruit_pos = -1 - def run(self, priority: list[str] = None, email_config={}, maa_config={}) -> None: + def run(self, priority: list[str] = None, email_config={}, maa_config={}): """ :param priority: list[str], 优先考虑的公招干员,默认为高稀有度优先 """ @@ -86,6 +87,8 @@ def run(self, priority: list[str] = None, email_config={}, maa_config={}) -> Non recruit_get_agent=self.result_agent, title_text="公招汇总"), "公招汇总通知", "html") + return self.agent_choose, self.result_agent + def add_recruit_param(self, maa_config): if not maa_config: raise Exception("招募设置为空") @@ -185,7 +188,8 @@ def recruit_tags(self) -> bool: # 刷新标签 if need_choose is False: '''稀有tag或支援,不需要选''' - self.send_email(recruit_rarity.render(recruit_results=best, title_text="稀有tag通知"), "出稀有标签辣", "html") + self.send_email(recruit_rarity.render(recruit_results=best, title_text="稀有tag通知"), "出稀有标签辣", + "html") logger.debug('稀有tag,发送邮件') self.back() return diff --git a/arknights_mower/solvers/skland.py b/arknights_mower/solvers/skland.py new file mode 100644 index 000000000..f285730a0 --- /dev/null +++ b/arknights_mower/solvers/skland.py @@ -0,0 +1,157 @@ +import json +import csv +import datetime +import requests + +from arknights_mower.utils.log import logger + + +class SKLand: + def __init__(self): + self.account = [ + { + "name": "风味手扒鸡#5916", + "phone": "", + "password": "", + "uid": "", + "cred": "" + } + ] + + self.url = { + "get_cred": "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code", + "token_by_phone_password": "https://as.hypergryph.com/user/auth/v1/token_by_phone_password", + "check_cred_url": "https://zonai.skland.com/api/v1/user/check", + "OAuth2": "https://as.hypergryph.com/user/oauth2/v2/grant", + "get_cred_url": "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code", + "attendance": "https://zonai.skland.com/api/v1/game/attendance" + } + + self.request_header = { + "user-agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 25; ) Okhttp/4.11.0", + } + + self.get_award = {} + self.record_path = "skland_record.csv" + + def respone_to_json(self, responese): + try: + responese_json = json.loads(responese.text) + return responese_json + except: + logger.info("请求返回数据存在问题") + + """登录获取cred""" + + def sign_by_phone(self, account): + data = {"phone": account['phone'], "password": account['password']} + response = requests.post(headers=self.request_header, url=self.url.get("token_by_phone_password"), data=data) + + response_json = self.respone_to_json(response) + + if response_json.get("status") == 0: + return response_json.get("data").get("token") + return "" + + def check_cred(self, account): + + if account['cred'] == "": + return False + headers = self.request_header + headers["cred"] = account['cred'] + response = requests.get(headers=headers, url=self.url.get("check_cred_url")) + response_json = self.respone_to_json(response) + + if response_json.get("code") == 0: + logger.debug("验证cred未过期") + return True + logger.debug("验证cred过期") + return False + + def get_OAuth2_token(self, account): + token = self.sign_by_phone(account) + if token == "": + raise "登陆失败" + + data = { + "token": token, + "appCode": "4ca99fa6b56cc2ba", + "type": 0 + } + response = requests.post(headers=self.request_header, url=self.url.get("OAuth2"), data=data) + response_json = self.respone_to_json(response) + if response_json.get("status") == 0: + logger.debug("OAuth2授权代码获取成功") + return response_json.get("data").get("code") + + return "" + + def get_cred(self, account): + code = self.get_OAuth2_token(account) + + if code == "": + raise ("OAuth2授权代码获取失败") + data = { + "kind": 1, + "code": code + } + response = requests.post(headers=self.request_header, url=self.url.get("get_cred_url"), data=data) + response_json = self.respone_to_json(response) + + if response_json.get("code") == 0: + return response_json.get("data").get("cred") + + return "" + + def attendance(self): + for account in self.account: + if self.get_record(account['name']): + logger.info(f"{account['name']} 今日已经签到过了") + continue + + if self.check_cred(account) is False: + account['cred'] = self.get_cred(account) + data = { + "uid": account["uid"], + "gameId": 1 + } + headers = self.request_header + headers["cred"] = account['cred'] + response = requests.post(headers=headers, url=self.url.get("attendance"), data=data) + + response_json = self.respone_to_json(response) + print(response_json) + award = [] + if response_json["code"] == 0: + + for item in response_json.get("data").get("awards"): + temp_str = str(item["count"]) + str(item["resource"]["name"]) + award.append(temp_str) + elif response_json["code"] == 10001 and response_json["message"] == "请勿重复签到!": + logger.info(f"{account['name']} 请勿重复签到!") + self.get_award[account['name']] = "请勿重复签到!" + + print(self.get_award) + self.record_attendance(account['name'], self.get_award[account['name']]) + + return self.get_award + + def record_attendance(self, name, data): + + data_row = [name, data, datetime.date.today()] + with open(self.record_path, 'a+') as f: + csv_write = csv.writer(f) + csv_write.writerow(data_row) + + def get_record(self, name): + try: + with open(self.record_path, 'r+') as f: + csv_reader = csv.reader(f) + for line in csv_reader: + if line[0] == name: + return True + except: + return True + + return False +