diff --git a/arknights_mower/__init__.py b/arknights_mower/__init__.py index 1e73d26da..5831b6fbd 100644 --- a/arknights_mower/__init__.py +++ b/arknights_mower/__init__.py @@ -15,4 +15,4 @@ __cli__ = not (__pyinstall__ and not sys.argv[1:]) __system__ = platform.system().lower() -__version__ = 'v3.5.0-alpha-17' +__version__ = 'v3.5.0-alpha-20' diff --git a/arknights_mower/solvers/base_construct.py b/arknights_mower/solvers/base_construct.py index 2a3b2e151..874c07864 100644 --- a/arknights_mower/solvers/base_construct.py +++ b/arknights_mower/solvers/base_construct.py @@ -1,7 +1,5 @@ from __future__ import annotations -from enum import Enum - import numpy as np from ..data import base_room_list @@ -11,29 +9,22 @@ from ..utils.log import logger from ..utils.recognize import RecognizeError, Recognizer, Scene from ..utils.solver import BaseSolver +from arknights_mower.utils.digit_reader import DigitReader +import time +import copy -class ArrangeOrder(Enum): - STATUS = 1 - SKILL = 2 - FEELING = 3 - TRUST = 4 - -arrange_order_res = { - ArrangeOrder.STATUS: ('arrange_status', 0.1), - ArrangeOrder.SKILL: ('arrange_skill', 0.35), - ArrangeOrder.FEELING: ('arrange_feeling', 0.65), - ArrangeOrder.TRUST: ('arrange_trust', 0.9), -} +from arknights_mower.solvers.base_mixin import ArrangeOrder, arrange_order_res, BaseMixin -class BaseConstructSolver(BaseSolver): +class BaseConstructSolver(BaseSolver, BaseMixin): """ 收集基建的产物:物资、赤金、信赖 """ def __init__(self, device: Device = None, recog: Recognizer = None) -> None: super().__init__(device, recog) + self.digit_reader = DigitReader() def run(self, arrange: dict[str, tp.BasePlan] = None, clue_collect: bool = False, drone_room: str = None, fia_room: str = None) -> None: """ @@ -42,6 +33,7 @@ def run(self, arrange: dict[str, tp.BasePlan] = None, clue_collect: bool = False :param drone_room: str, 是否使用无人机加速 :param fia_room: str, 是否使用菲亚梅塔恢复心情 """ + self.last_room = "" self.arrange = arrange self.clue_collect = clue_collect self.drone_room = drone_room @@ -351,25 +343,6 @@ def ori_clue(self): logger.debug(clues) return clues - def enter_room(self, room: str) -> tp.Rectangle: - """ 获取房间的位置并进入 """ - - # 获取基建各个房间的位置 - base_room = segment.base(self.recog.img, self.find('control_central', strict=True)) - - # 将画面外的部分删去 - room = base_room[room] - for i in range(4): - room[i, 0] = max(room[i, 0], 0) - room[i, 0] = min(room[i, 0], self.recog.w) - room[i, 1] = max(room[i, 1], 0) - room[i, 1] = min(room[i, 1], self.recog.h) - - # 点击进入 - self.tap(room[0], interval=3) - while self.find('control_central') is not None: - self.tap(room[0], interval=3) - def drone(self, room: str): logger.info('基建:无人机加速') @@ -404,171 +377,113 @@ def drone(self, room: str): self.back(interval=2, rebuild=False) self.back(interval=2) - def get_arrange_order(self) -> ArrangeOrder: - best_score, best_order = 0, None - for order in ArrangeOrder: - score = self.recog.score(arrange_order_res[order][0]) - if score is not None and score[0] > best_score: - best_score, best_order = score[0], order - # if best_score < 0.6: - # raise RecognizeError - logger.debug((best_score, best_order)) - return best_order - - def switch_arrange_order(self, order: ArrangeOrder) -> None: - self.tap_element(arrange_order_res[order][0], x_rate=arrange_order_res[order][1], judge=False) - - def arrange_order(self, order: ArrangeOrder) -> None: - if self.get_arrange_order() != order: - self.switch_arrange_order(order) - - def choose_agent(self, agent: list[str], skip_free: int = 0, order: ArrangeOrder = None) -> None: + + def get_order(self, name): + return False, [2, "false"] + + + def choose_agent(self, agents: list[str], room: str) -> None: """ :param order: ArrangeOrder, 选择干员时右上角的排序功能 """ - logger.info(f'安排干员:{agent}') - logger.debug(f'skip_free: {skip_free}') + first_name = '' + max_swipe = 50 + position = [(0.35, 0.35), (0.35, 0.75), (0.45, 0.35), (0.45, 0.75), (0.55, 0.35)] + for idx, n in enumerate(agents): + if n == '': + agents[idx] = 'Free' + agent = copy.deepcopy(agents) + exists = [] + logger.info(f'安排干员 :{agent}') + # 若不是空房间,则清空工作中的干员 + is_dorm = room.startswith("dorm") h, w = self.recog.h, self.recog.w first_time = True - # 在 agent 中 'Free' 表示任意空闲干员 free_num = agent.count('Free') - agent = set(agent) - set(['Free']) - - # 安排指定干员 - if len(agent): - - if not first_time: - # 滑动到最左边 - self.sleep(interval=0.5, rebuild=False) - for _ in range(9): - self.swipe_only((w//2, h//2), (w//2, 0), interval=0.5) - self.swipe((w//2, h//2), (w//2, 0), interval=3, rebuild=False) - else: - # 第一次进入按技能排序 - if order is not None: - self.arrange_order(order) + for i in range(agent.count("Free")): + agent.remove("Free") + index_change = False + pre_order = [2, False] + right_swipe = 0 + retry_count = 0 + # 如果重复进入宿舍则需要排序 + selected = [] + logger.info(f'上次进入房间为:{self.last_room},本次房间为:{room}') + if self.last_room.startswith('dorm') and is_dorm: + self.detail_filter(False) + while len(agent) > 0: + if retry_count > 1: raise Exception(f"到达最大尝试次数 1次") + if right_swipe > max_swipe: + # 到底了则返回再来一次 + for _ in range(right_swipe): + self.swipe_only((w // 2, h // 2), (w // 2, 0), interval=0.5) + right_swipe = 0 + max_swipe = 50 + retry_count += 1 + self.detail_filter(False) + if first_time: + # 清空 + if is_dorm: + self.switch_arrange_order(3, "true") + pre_order = [3, 'true'] + self.tap((self.recog.w * 0.38, self.recog.h * 0.95), interval=0.5) + changed, ret = self.scan_agent(agent) + if changed: + selected.extend(changed) + if len(agent) == 0: break + index_change = True + + # 如果选中了人,则可能需要重新排序 + if index_change or first_time: + # 第一次则调整 + is_custom, arrange_type = self.get_order(agent[0]) + arrange_type = (3, 'true') + # 如果重新排序则滑到最左边 + if pre_order[0] != arrange_type[0] or pre_order[1] != arrange_type[1]: + self.switch_arrange_order(arrange_type[0], arrange_type[1]) + # 滑倒最左边 + self.sleep(interval=0.5, rebuild=True) + right_swipe = self.swipe_left(right_swipe, w, h) + pre_order = arrange_type first_time = False - checked = set() # 已经识别过的干员 - pre = set() # 上次识别出的干员 - error_count = 0 - - while len(agent): - try: - # 识别干员 - ret = character_recognize.agent(self.recog.img) # 返回的顺序是从左往右从上往下 - except RecognizeError as e: - error_count += 1 - if error_count < 3: - logger.debug(e) - self.sleep(3) - else: - raise e - continue - - # 提取识别出来的干员的名字 - agent_name = set([x[0] for x in ret]) - if agent_name == pre: - error_count += 1 - if error_count >= 3: - logger.warning(f'未找到干员:{list(agent)}') - break - else: - pre = agent_name - - # 更新已经识别过的干员 - checked |= agent_name - - # 如果出现了需要的干员则选择 - # 优先安排菲亚梅塔 - if '菲亚梅塔' in agent: - if '菲亚梅塔' in agent_name: - for y in ret: - if y[0] == '菲亚梅塔': - self.tap((y[1][0]), interval=0, rebuild=False) - break - agent.remove('菲亚梅塔') - - # 如果菲亚梅塔是 the only one - if len(agent) == 0: - break - # 否则滑动到最左边 - self.sleep(interval=0.5, rebuild=False) - for _ in range(9): - self.swipe_only((w//2, h//2), (w//2, 0), interval=0.5) - self.swipe((w//2, h//2), (w//2, 0), interval=3, rebuild=False) - - # reset the statuses and cancel the rightward-swiping - checked = set() - pre = set() - error_count = 0 - continue - + changed, ret = self.scan_agent(agent) + if changed: + selected.extend(changed) + # 如果找到了 + index_change = True + else: + # 如果没找到 而且右移次数大于5 + if ret[0][0] == first_name and right_swipe > 5: + max_swipe = right_swipe else: - for y in ret: - name = y[0] - if name in agent_name & agent: - self.tap((y[1][0]), interval=0, rebuild=False) - agent.remove(name) - # for name in agent_name & agent: - # for y in ret: - # if y[0] == name: - # self.tap((y[1][0]), interval=0, rebuild=False) - # break - # agent.remove(name) - - # 如果已经完成选择则退出 - if len(agent) == 0: - break - + first_name = ret[0][0] + index_change = False st = ret[-2][1][2] # 起点 - ed = ret[0][1][1] # 终点 - self.swipe_noinertia(st, (ed[0]-st[0], 0)) - - # 安排空闲干员 - if free_num: - - if not first_time: - # 滑动到最左边 - self.sleep(interval=0.5, rebuild=False) - for _ in range(9): - self.swipe_only((w//2, h//2), (w//2, 0), interval=0.5) - self.swipe((w//2, h//2), (w//2, 0), interval=3, rebuild=False) - else: - # 第一次进入按技能排序 - if order is not None: - self.arrange_order(order) - first_time = False + ed = ret[0][1][1] # 终点 + self.swipe_noinertia(st, (ed[0] - st[0], 0)) + right_swipe += 1 + if len(agent) == 0: break; + # 排序 + if len(agents) != 1: + # 左移 + self.swipe_left(right_swipe, w, h) + self.tap((self.recog.w * arrange_order_res[ArrangeOrder.SKILL][0], + self.recog.h * arrange_order_res[ArrangeOrder.SKILL][1]), interval=0.5, rebuild=False) + not_match = False + exists.extend(selected) + for idx, item in enumerate(agents): + if agents[idx] != exists[idx] or not_match: + not_match = True + p_idx = exists.index(agents[idx]) + self.tap((self.recog.w * position[p_idx][0], self.recog.h * position[p_idx][1]), interval=0, + rebuild=False) + self.tap((self.recog.w * position[p_idx][0], self.recog.h * position[p_idx][1]), interval=0, + rebuild=False) + self.last_room = room + logger.info(f"设置上次房间为{self.last_room}") - error_count = 0 - - while free_num: - try: - # 识别空闲干员 - ret, st, ed = segment.free_agent(self.recog.img) # 返回的顺序是从左往右从上往下 - except RecognizeError as e: - error_count += 1 - if error_count < 3: - logger.debug(e) - self.sleep(3) - else: - raise e - continue - - while free_num and len(ret): - if skip_free > 0: - skip_free -= 1 - else: - self.tap(ret[0], interval=0, rebuild=False) - free_num -= 1 - ret = ret[1:] - - # 如果已经完成选择则退出 - if free_num == 0: - break - - self.swipe_noinertia(st, (ed[0]-st[0], 0)) def agent_arrange(self, plan: tp.BasePlan) -> None: """ 基建排班 """ @@ -621,7 +536,7 @@ def agent_arrange(self, plan: tp.BasePlan) -> None: else: default_order = ArrangeOrder.SKILL self.choose_agent( - plan[base_room_list[idx]], skip_free, default_order) + plan[base_room_list[idx]], base_room_list[idx]) except RecognizeError as e: error_count += 1 if error_count >= 3: @@ -995,3 +910,56 @@ def fia(self, room: str): # clues['own'][i] = count # return clues + + def get_agent_from_room(self, room, read_time_index=None, length=3): + if read_time_index is None: + read_time_index = [] + error_count = 0 + if room == 'meeting': + time.sleep(3) + self.recog.update() + clue_res = self.read_screen(self.recog.img, limit=10, cord=(645, 977, 755, 1018)) + if clue_res != 11: + self.clue_count = clue_res + logger.info(f'当前拥有线索数量为{self.clue_count}') + while self.find('room_detail') is None: + if error_count > 3: + self.reset_room_time(room) + raise Exception('未成功进入房间') + self.tap((self.recog.w * 0.05, self.recog.h * 0.4), interval=0.5) + error_count += 1 + if length > 3: + self.swipe((self.recog.w * 0.8, self.recog.h * 0.5), (0, self.recog.h * 0.45), duration=500, + interval=1, + rebuild=True) + name_p = [((1460, 160), (1800, 215)), ((1460, 365), (1800, 425)), ((1460, 576), (1800, 633)), + ((1460, 555), (1800, 613)), ((1460, 765), (1800, 823))] + time_p = [((1650, 270, 1780, 305)), ((1650, 480, 1780, 515)), ((1650, 690, 1780, 725)), + ((1650, 668, 1780, 703)), ((1650, 877, 1780, 912))] + mood_p = [((1470, 219, 1780, 221)), ((1470, 428, 1780, 430)), ((1470, 637, 1780, 639)), + ((1470, 615, 1780, 617)), ((1470, 823, 1780, 825))] + result = [] + swiped = False + for i in range(length): + if i >= 3 and not swiped: + self.swipe((self.recog.w * 0.8, self.recog.h * 0.5), (0, -self.recog.h * 0.45), duration=500, + interval=1, rebuild=True) + swiped = True + _name = self.read_screen(self.recog.img[name_p[i][0][1]:name_p[i][1][1], name_p[i][0][0]:name_p[i][1][0]], + type="name") + error_count = 0 + while i >= 3 and _name in result: + logger.warning("检测到滑动可能失败") + self.swipe((self.recog.w * 0.8, self.recog.h * 0.5), (0, -self.recog.h * 0.45), duration=500, + interval=1, rebuild=True) + _name = self.read_screen( + self.recog.img[name_p[i][0][1]:name_p[i][1][1], name_p[i][0][0]:name_p[i][1][0]], type="name") + error_count += 1 + if error_count > 1: + raise Exception("超过出错上限") + if room.startswith('dorm'): + extra = self.read_time(time_p[i], upperlimit=43200, error_count=4) + else: + extra = self.read_accurate_mood(self.recog.img, cord=mood_p[i]) + result.append((_name, extra)) + return result diff --git a/arknights_mower/solvers/base_mixin.py b/arknights_mower/solvers/base_mixin.py new file mode 100644 index 000000000..353546ea4 --- /dev/null +++ b/arknights_mower/solvers/base_mixin.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +from datetime import timedelta, datetime +from enum import Enum + +import cv2 + +from arknights_mower.utils import character_recognize, segment +from arknights_mower.utils.log import logger +from arknights_mower.utils import typealias as tp +from arknights_mower.utils.recognize import Scene +from arknights_mower.utils import rapidocr +from arknights_mower.data import agent_list, ocr_error + + +class ArrangeOrder(Enum): + STATUS = 1 + SKILL = 2 + FEELING = 3 + TRUST = 4 + + +arrange_order_res = { + ArrangeOrder.STATUS: (1560 / 2496, 96 / 1404), + ArrangeOrder.SKILL: (1720 / 2496, 96 / 1404), + ArrangeOrder.FEELING: (1880 / 2496, 96 / 1404), + ArrangeOrder.TRUST: (2050 / 2496, 96 / 1404), +} + + +class BaseMixin: + def switch_arrange_order(self, index: int, asc="false") -> None: + self.tap( + ( + self.recog.w * arrange_order_res[ArrangeOrder(index)][0], + self.recog.h * arrange_order_res[ArrangeOrder(index)][1], + ), + interval=0, + rebuild=False, + ) + # 点个不需要的 + if index < 4: + self.tap( + ( + self.recog.w * arrange_order_res[ArrangeOrder(index + 1)][0], + self.recog.h * arrange_order_res[ArrangeOrder(index)][1], + ), + interval=0, + rebuild=False, + ) + else: + self.tap( + ( + self.recog.w * arrange_order_res[ArrangeOrder(index - 1)][0], + self.recog.h * arrange_order_res[ArrangeOrder(index)][1], + ), + interval=0, + rebuild=False, + ) + # 切回来 + self.tap( + ( + self.recog.w * arrange_order_res[ArrangeOrder(index)][0], + self.recog.h * arrange_order_res[ArrangeOrder(index)][1], + ), + interval=0.2, + rebuild=True, + ) + # 倒序 + if asc != "false": + self.tap( + ( + self.recog.w * arrange_order_res[ArrangeOrder(index)][0], + self.recog.h * arrange_order_res[ArrangeOrder(index)][1], + ), + interval=0.2, + rebuild=True, + ) + + def scan_agent(self, agent: list[str], error_count=0, max_agent_count=-1): + try: + # 识别干员 + self.recog.update() + ret = character_recognize.agent(self.recog.img) # 返回的顺序是从左往右从上往下 + # 提取识别出来的干员的名字 + select_name = [] + for y in ret: + name = y[0] + if name in agent: + select_name.append(name) + # self.get_agent_detail((y[1][0])) + self.tap((y[1][0]), interval=0) + agent.remove(name) + # 如果是按照个数选择 Free + if max_agent_count != -1: + if len(select_name) >= max_agent_count: + return select_name, ret + return select_name, ret + except Exception as e: + error_count += 1 + if error_count < 3: + logger.exception(e) + self.sleep(3) + return self.scan_agent(agent, error_count, max_agent_count) + else: + raise e + + def swipe_left(self, right_swipe, w, h): + for _ in range(right_swipe): + self.swipe_only((w // 2, h // 2), (w // 2, 0), interval=0.5) + return 0 + + def detail_filter(self, turn_on, type="not_in_dorm"): + logger.info(f'开始 {("打开" if turn_on else "关闭")} {type} 筛选') + self.tap((self.recog.w * 0.95, self.recog.h * 0.05), interval=1) + if type == "not_in_dorm": + not_in_dorm = self.find("arrange_non_check_in", score=0.9) + if turn_on ^ (not_in_dorm is None): + self.tap((self.recog.w * 0.3, self.recog.h * 0.5), interval=0.5) + # 确认 + self.tap((self.recog.w * 0.8, self.recog.h * 0.8), interval=0.5) + + def enter_room(self, room: str) -> tp.Rectangle: + """获取房间的位置并进入""" + success = False + retry = 3 + while not success: + try: + # 获取基建各个房间的位置 + base_room = segment.base( + self.recog.img, self.find("control_central", strict=True) + ) + # 将画面外的部分删去 + _room = base_room[room] + + for i in range(4): + _room[i, 0] = max(_room[i, 0], 0) + _room[i, 0] = min(_room[i, 0], self.recog.w) + _room[i, 1] = max(_room[i, 1], 0) + _room[i, 1] = min(_room[i, 1], self.recog.h) + + # 点击进入 + self.tap(_room[0], interval=3) + while self.find("control_central") is not None: + self.tap(_room[0], interval=3) + success = True + except Exception as e: + retry -= 1 + self.back_to_infrastructure() + self.wait_for_scene(Scene.INFRA_MAIN, "get_infra_scene") + if retry <= 0: + raise e + + def double_read_time(self, cord, upperLimit=None, use_digit_reader=False): + self.recog.update() + time_in_seconds = self.read_time(cord, upperLimit, use_digit_reader) + if time_in_seconds is None: + return datetime.now() + execute_time = datetime.now() + timedelta(seconds=(time_in_seconds)) + return execute_time + + def read_accurate_mood(self, img, cord): + try: + img = img[cord[1] : cord[3], cord[0] : cord[2]] + # Convert the image to grayscale + gray_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + blurred_image = cv2.GaussianBlur(gray_image, (5, 5), 0) + + # Threshold the image to isolate the progress bar region + contours, hierarchy = cv2.findContours( + blurred_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE + ) + + # Calculate the bounding box of the progress bar + x, y, w, h = cv2.boundingRect(contours[0]) + + # Crop the progress bar region + progress_bar = img[y : y + h, x : x + w] + + # Convert the progress bar to grayscale + gray_pb = cv2.cvtColor(progress_bar, cv2.COLOR_BGR2GRAY) + + # Threshold the progress bar to isolate the gray fill + ret, thresh_pb = cv2.threshold(gray_pb, 137, 255, cv2.THRESH_BINARY) + + # Calculate the ratio of colored pixels to the total number of pixels in the progress bar region + total_pixels = w * h + colored_pixels = cv2.countNonZero(thresh_pb) + return colored_pixels / total_pixels * 24 + + except Exception: + return 24 + + def read_screen(self, img, type="mood", limit=24, cord=None): + if cord is not None: + img = img[cord[1] : cord[3], cord[0] : cord[2]] + try: + ret = rapidocr.engine(img, use_det=False, use_cls=False, use_rec=True)[0] + logger.debug(ret) + if not ret or not ret[0][0]: + if "name" in type: + return character_recognize.agent_name(img, self.recog.h) + raise Exception("识别失败") + ret = ret[0][0] + if "赤金完成" in ret: + raise Exception("读取到赤金收取提示") + elif "心情" in ret: + raise Exception("识别区域错误") + if "mood" in type: + if (f"/{limit}") in ret: + ret = ret.replace(f"/{limit}", "") + if len(ret) > 0: + if "." in ret: + ret = ret.replace(".", "") + return int(ret) + else: + return -1 + elif "time" in type: + if "." in ret: + ret = ret.replace(".", ":") + return ret.strip() + elif "name" in type: + if ret in agent_list: + return ret + if ret in ocr_error: + name = ocr_error[ret] + logger.debug(f"{ret} =====> {name}") + return name + return character_recognize.agent_name(img, self.recog.h) + else: + return ret + except Exception as e: + logger.exception(e) + return limit + 1 + + def read_time(self, cord, upperlimit, error_count=0, use_digit_reader=False): + # 刷新图片 + self.recog.update() + try: + if use_digit_reader: + time_str = self.digit_reader.get_time(self.recog.gray) + else: + time_str = self.read_screen(self.recog.img, type="time", cord=cord) + h, m, s = str(time_str).split(":") + if int(m) > 60 or int(s) > 60: + raise Exception(f"读取错误") + res = int(h) * 3600 + int(m) * 60 + int(s) + if upperlimit is not None and res > upperlimit: + raise Exception(f"超过读取上限") + else: + return res + except: + logger.error("读取失败") + if error_count > 3: + logger.exception(f"读取失败{error_count}次超过上限") + return None + else: + return self.read_time( + cord, upperlimit, error_count + 1, use_digit_reader + ) diff --git a/arknights_mower/solvers/base_schedule.py b/arknights_mower/solvers/base_schedule.py index 1c8a0967f..d39df46ec 100644 --- a/arknights_mower/solvers/base_schedule.py +++ b/arknights_mower/solvers/base_schedule.py @@ -6,7 +6,6 @@ import sys import pathlib import urllib.request -from enum import Enum from datetime import datetime, timedelta import numpy as np import smtplib @@ -43,26 +42,13 @@ from arknights_mower.utils.email import maa_template - -class ArrangeOrder(Enum): - STATUS = 1 - SKILL = 2 - FEELING = 3 - TRUST = 4 - - -arrange_order_res = { - ArrangeOrder.STATUS: (1560 / 2496, 96 / 1404), - ArrangeOrder.SKILL: (1720 / 2496, 96 / 1404), - ArrangeOrder.FEELING: (1880 / 2496, 96 / 1404), - ArrangeOrder.TRUST: (2050 / 2496, 96 / 1404), -} +from arknights_mower.solvers.base_mixin import ArrangeOrder, arrange_order_res, BaseMixin stage_drop = {} -class BaseSchedulerSolver(BaseSolver): +class BaseSchedulerSolver(BaseSolver, BaseMixin): """ 收集基建的产物:物资、赤金、信赖 """ @@ -85,17 +71,26 @@ def __init__(self, device: Device = None, recog: Recognizer = None) -> None: self.maa_config = {} self.free_clue = None self.credit_fight = None - self.maa_depot_enable = False + self.task_count = 0 self.exit_game_when_idle = False self.simulator = None self.close_simulator_when_idle = False self.refresh_connecting = False self.recruit_config = {} self.skland_config = {} - self.recruit_time = None - self.daily_mission = False + + @property + def party_time(self): + return self._party_time + + @party_time.setter + def party_time(self, value): + self._party_time = value + if self.op_data is not None: + self.op_data.party_time = value + def run(self) -> None: """ :param clue_collect: bool, 是否收取线索 @@ -906,80 +901,6 @@ def get_run_roder_time(self, room): self.back(interval=2) return execute_time - def double_read_time(self, cord, upperLimit=None, use_digit_reader=False): - self.recog.update() - time_in_seconds = self.read_time(cord, upperLimit, use_digit_reader) - if time_in_seconds is None: - return datetime.now() - execute_time = datetime.now() + timedelta(seconds=(time_in_seconds)) - return execute_time - - def read_screen(self, img, type="mood", limit=24, cord=None): - if cord is not None: - img = img[cord[1]:cord[3], cord[0]:cord[2]] - try: - ret = rapidocr.engine(img, use_det=False, use_cls=False, use_rec=True)[0] - logger.debug(ret) - if not ret or not ret[0][0]: - if 'name' in type: - return character_recognize.agent_name(img, self.recog.h) - raise Exception("识别失败") - ret = ret[0][0] - if "赤金完成" in ret: - raise Exception("读取到赤金收取提示") - elif "心情" in ret: - raise Exception("识别区域错误") - if 'mood' in type: - if (f"/{limit}") in ret: - ret = ret.replace(f"/{limit}", '') - if len(ret) > 0: - if '.' in ret: - ret = ret.replace(".", "") - return int(ret) - else: - return -1 - elif 'time' in type: - if '.' in ret: - ret = ret.replace(".", ":") - return ret.strip() - elif 'name' in type: - if ret in agent_list: - return ret - if ret in ocr_error: - name = ocr_error[ret] - logger.debug(f"{ret} =====> {name}") - return name - return character_recognize.agent_name(img, self.recog.h) - else: - return ret - except Exception as e: - logger.exception(e) - return limit + 1 - - def read_time(self, cord, upperlimit, error_count=0, use_digit_reader=False): - # 刷新图片 - self.recog.update() - try: - if use_digit_reader: - time_str = self.digit_reader.get_time(self.recog.gray) - else: - time_str = self.read_screen(self.recog.img, type='time', cord=cord) - h, m, s = str(time_str).split(':') - if int(m) > 60 or int(s) > 60: - raise Exception(f"读取错误") - res = int(h) * 3600 + int(m) * 60 + int(s) - if upperlimit is not None and res > upperlimit: - raise Exception(f"超过读取上限") - else: - return res - except: - logger.error("读取失败") - if error_count > 3: - logger.exception(f"读取失败{error_count}次超过上限") - return None - else: - return self.read_time(cord, upperlimit, error_count + 1, use_digit_reader) - def todo_list(self) -> None: """ 处理基建 Todo 列表 """ tapped = False @@ -1317,35 +1238,6 @@ def ori_clue(self): logger.debug(clues) return clues - def enter_room(self, room: str) -> tp.Rectangle: - """ 获取房间的位置并进入 """ - success = False - retry = 3 - while not success: - try: - # 获取基建各个房间的位置 - base_room = segment.base(self.recog.img, self.find('control_central', strict=True)) - # 将画面外的部分删去 - _room = base_room[room] - - for i in range(4): - _room[i, 0] = max(_room[i, 0], 0) - _room[i, 0] = min(_room[i, 0], self.recog.w) - _room[i, 1] = max(_room[i, 1], 0) - _room[i, 1] = min(_room[i, 1], self.recog.h) - - # 点击进入 - self.tap(_room[0], interval=3) - while self.find('control_central') is not None: - self.tap(_room[0], interval=3) - success = True - except Exception as e: - retry -= 1 - self.back_to_infrastructure() - self.wait_for_scene(Scene.INFRA_MAIN, "get_infra_scene") - if retry <= 0: - raise e - def adjust_order_time(self, accelerate, room): error_count = 0 while scheduling(self.tasks) is not None: @@ -1565,68 +1457,12 @@ def get_arrange_order(self) -> ArrangeOrder: logger.debug((best_score, best_order)) return best_order - def switch_arrange_order(self, index: int, asc="false") -> None: - self.tap((self.recog.w * arrange_order_res[ArrangeOrder(index)][0], - self.recog.h * arrange_order_res[ArrangeOrder(index)][1]), interval=0, rebuild=False) - # 点个不需要的 - if index < 4: - self.tap((self.recog.w * arrange_order_res[ArrangeOrder(index + 1)][0], - self.recog.h * arrange_order_res[ArrangeOrder(index)][1]), interval=0, rebuild=False) - else: - self.tap((self.recog.w * arrange_order_res[ArrangeOrder(index - 1)][0], - self.recog.h * arrange_order_res[ArrangeOrder(index)][1]), interval=0, rebuild=False) - # 切回来 - self.tap((self.recog.w * arrange_order_res[ArrangeOrder(index)][0], - self.recog.h * arrange_order_res[ArrangeOrder(index)][1]), interval=0.2, rebuild=True) - # 倒序 - if asc != "false": - self.tap((self.recog.w * arrange_order_res[ArrangeOrder(index)][0], - self.recog.h * arrange_order_res[ArrangeOrder(index)][1]), interval=0.2, rebuild=True) - - def scan_agant(self, agent: list[str], error_count=0, max_agent_count=-1): - try: - # 识别干员 - self.recog.update() - ret = character_recognize.agent(self.recog.img) # 返回的顺序是从左往右从上往下 - # 提取识别出来的干员的名字 - select_name = [] - for y in ret: - name = y[0] - if name in agent: - select_name.append(name) - # self.get_agent_detail((y[1][0])) - self.tap((y[1][0]), interval=0) - agent.remove(name) - # 如果是按照个数选择 Free - if max_agent_count != -1: - if len(select_name) >= max_agent_count: - return select_name, ret - return select_name, ret - except Exception as e: - error_count += 1 - if error_count < 3: - logger.exception(e) - self.sleep(3) - return self.scan_agant(agent, error_count, max_agent_count) - else: - raise e - def get_order(self, name): if name in self.op_data.operators: return True, self.op_data.operators[name].arrange_order else: return False, [2, "false"] - def detail_filter(self, turn_on, type="not_in_dorm"): - logger.info(f'开始 {("打开" if turn_on else "关闭")} {type} 筛选') - self.tap((self.recog.w * 0.95, self.recog.h * 0.05), interval=1) - if type == "not_in_dorm": - not_in_dorm = self.find('arrange_non_check_in', score=0.9) - if turn_on ^ (not_in_dorm is None): - self.tap((self.recog.w * 0.3, self.recog.h * 0.5), interval=0.5) - # 确认 - self.tap((self.recog.w * 0.8, self.recog.h * 0.8), interval=0.5) - def choose_agent(self, agents: list[str], room: str, fast_mode=True) -> None: """ :param order: ArrangeOrder, 选择干员时右上角的排序功能 @@ -1697,7 +1533,7 @@ def choose_agent(self, agents: list[str], room: str, fast_mode=True) -> None: pre_order = [3, 'true'] if not fast_mode: self.tap((self.recog.w * 0.38, self.recog.h * 0.95), interval=0.5) - changed, ret = self.scan_agant(agent) + changed, ret = self.scan_agent(agent) if changed: selected.extend(changed) if len(agent) == 0: break @@ -1720,7 +1556,7 @@ def choose_agent(self, agents: list[str], room: str, fast_mode=True) -> None: pre_order = arrange_type first_time = False - changed, ret = self.scan_agant(agent) + changed, ret = self.scan_agent(agent) if changed: selected.extend(changed) # 如果找到了 @@ -1755,7 +1591,7 @@ def choose_agent(self, agents: list[str], room: str, fast_mode=True) -> None: free_list.extend([_name for _name in agent_list if _name not in self.op_data.operators.keys()]) free_list = list(set(free_list) - set(self.op_data.config.free_blacklist)) while free_num: - selected_name, ret = self.scan_agant(free_list, max_agent_count=free_num) + selected_name, ret = self.scan_agent(free_list, max_agent_count=free_num) selected.extend(selected_name) free_num -= len(selected_name) while len(selected_name) > 0: @@ -1787,42 +1623,6 @@ def choose_agent(self, agents: list[str], room: str, fast_mode=True) -> None: self.last_room = room logger.info(f"设置上次房间为{self.last_room}") - def swipe_left(self, right_swipe, w, h): - for _ in range(right_swipe): - self.swipe_only((w // 2, h // 2), (w // 2, 0), interval=0.5) - return 0 - - def read_accurate_mood(self, img, cord): - try: - img = img[cord[1]:cord[3], cord[0]:cord[2]] - # Convert the image to grayscale - gray_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - blurred_image = cv2.GaussianBlur(gray_image, (5, 5), 0) - - # Threshold the image to isolate the progress bar region - contours, hierarchy = cv2.findContours(blurred_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - # Calculate the bounding box of the progress bar - x, y, w, h = cv2.boundingRect(contours[0]) - - # Crop the progress bar region - progress_bar = img[y:y + h, x:x + w] - - # Convert the progress bar to grayscale - gray_pb = cv2.cvtColor(progress_bar, cv2.COLOR_BGR2GRAY) - - # Threshold the progress bar to isolate the gray fill - ret, thresh_pb = cv2.threshold(gray_pb, 137, 255, cv2.THRESH_BINARY) - - # Calculate the ratio of colored pixels to the total number of pixels in the progress bar region - total_pixels = w * h - colored_pixels = cv2.countNonZero(thresh_pb) - return colored_pixels / total_pixels * 24 - - except Exception: - return 24 - def reset_room_time(self, room): for _operator in self.op_data.operators.keys(): if self.op_data.operators[_operator].room == room: @@ -2224,19 +2024,25 @@ def initialize_maa(self): if asst_path not in sys.path: sys.path.append(asst_path) global Message - from asst.asst import Asst - from asst.utils import Message, Version, InstanceOptionType - from asst.updater import Updater + + try: + from asst.asst import Asst + from asst.utils import Message, InstanceOptionType + logger.info("Maa Python模块导入成功") + except Exception as e: + logger.error(f'Maa Python模块导入失败:{str(e)}') + raise Exception("Maa Python模块导入失败") try: logger.debug(f'开始更新Maa活动关卡导航……') ota_tasks_url = 'https://ota.maa.plus/MaaAssistantArknights/api/resource/tasks.json' ota_tasks_path = path / 'cache' / 'resource' / 'tasks.json' ota_tasks_path.parent.mkdir(parents=True, exist_ok=True) + with urllib.request.urlopen(ota_tasks_url) as u: + res = u.read().decode('utf-8') with open(ota_tasks_path, 'w', encoding='utf-8') as f: - with urllib.request.urlopen(ota_tasks_url) as u: - f.write(u.read().decode('utf-8')) - logger.info(f'Maa活动关卡导航更新成功!') + f.write(res) + logger.info(f'Maa活动关卡导航更新成功') except Exception as e: logger.error(f'Maa活动关卡导航更新失败:{str(e)}') diff --git a/arknights_mower/solvers/mail.py b/arknights_mower/solvers/mail.py index 1cf817030..5a1999807 100644 --- a/arknights_mower/solvers/mail.py +++ b/arknights_mower/solvers/mail.py @@ -16,7 +16,7 @@ def run(self) -> None: # if it touched self.touched = False - logger.info('Start: 邮件') + logger.info('Start: 领取邮件') super().run() def transition(self) -> bool: diff --git a/arknights_mower/tests/base_scheduler_tests.py b/arknights_mower/tests/base_scheduler_tests.py new file mode 100644 index 000000000..d34347071 --- /dev/null +++ b/arknights_mower/tests/base_scheduler_tests.py @@ -0,0 +1,104 @@ +import unittest +from datetime import datetime +from unittest.mock import patch, MagicMock +from arknights_mower.solvers.base_schedule import BaseSchedulerSolver +from arknights_mower.utils.logic_expression import LogicExpression +from arknights_mower.utils.plan import Room, PlanConfig, Plan + +with patch.dict('sys.modules', {'RecruitSolver': MagicMock()}): + from ..solvers import RecruitSolver + + +class TestBaseScheduler(unittest.TestCase): + + @patch.object(BaseSchedulerSolver, '__init__', lambda x: None) + def test_backup_plan_solver_Caper(self): + plan_config = {"meeting": [Room("伊内丝", "", ["见行者", "陈"]), + Room("跃跃", "", ["见行者", "陈"])]} + plan_config1 = {"meeting": [Room("伊内丝", "", ["陈", "红"]), + Room("见行者", "", ["陈", "红"])]} + agent_base_config = PlanConfig("稀音", "稀音", "伺夜") + plan = { + # 阶段 1 + "default_plan": Plan(plan_config, agent_base_config), + "backup_plans": [Plan(plan_config1, agent_base_config, + trigger=LogicExpression("op_data.party_time is None", + "and", " True "), + task={'meeting': ['Current', '见行者']})] + } + + solver = BaseSchedulerSolver() + solver.global_plan = plan + solver.initialize_operators() + solver.tasks = [] + with patch.object(BaseSchedulerSolver, 'agent_get_mood') as mock_agent_get_mood: + mock_agent_get_mood.return_value = None + solver.backup_plan_solver() + self.assertEqual(len(solver.tasks), 1) + print(solver.op_data.plan_name) + solver.party_time = datetime.now() + solver.backup_plan_solver() + self.assertEqual(solver.op_data.plan_name, "default_plan") + + @patch.object(BaseSchedulerSolver, '__init__', lambda x: None) + def test_backup_plan_solver_GreyytheLightningbearer(self): + plan_config = {"room_2_3": [Room("雷蛇", "澄闪", ["炎狱炎熔", "格雷伊"])], + "room_1_3": [Room("承曦格雷伊", "自动化", ["炎狱炎熔"])], + "room_2_1": [Room("温蒂", "自动化", ["泡泡"]), + Room("森蚺", "自动化", ["火神"]), + Room("清流", "自动化", ["贝娜"])], + "room_2_2": [Room("澄闪", "澄闪", ["炎狱炎熔", "格雷伊"])], + "central": [Room("阿米娅", "", ["诗怀雅"]), + Room("琴柳", "乌有", ["清道夫"]), + Room("重岳", "乌有", ["杜宾"]), + Room("夕", "乌有", ["玛恩纳"]), + Room("令", "乌有", ["凯尔希"])], + "contact": [Room("桑葚", "乌有", ["絮雨"])], + } + backup_plan1_config = {"central": [Room("阿米娅", "", ["诗怀雅"]), + Room("清道夫", "", ["诗怀雅"]), + Room("杜宾", "", ["泡泡"]), + Room("玛恩纳", "", ["火神"]), + Room("森蚺", "", ["诗怀雅"])], + "room_2_1": [Room("温蒂", "", ["泡泡"]), + Room("掠风", "", ["贝娜"]), + Room("清流", "", ["火神"])], + "room_1_3": [Room("Lancet-2", "", ["承曦格雷伊"])], + "room_2_2": [Room("澄闪", "", ["承曦格雷伊", "格雷伊"])], + "room_2_3": [Room("雷蛇", "", ["承曦格雷伊", "格雷伊"])], + "contact": [Room("絮雨", "", ["桑葚"])], + } + agent_base_config0 = PlanConfig("稀音,黑键,焰尾,伊内丝", "稀音,柏喙,伊内丝", "伺夜,帕拉斯,雷蛇,澄闪,红云,乌有,年,远牙,阿米娅,桑葚,截云,掠风", ling_xi=2, + resting_threshold=0.1, run_order_buffer_time=20) + agent_base_config = PlanConfig("稀音,黑键,焰尾,伊内丝", "稀音,柏喙,伊内丝", "伺夜,帕拉斯,雷蛇,澄闪,红云,乌有,年,远牙,阿米娅,桑葚,截云", ling_xi=2, + free_blacklist="艾丽妮,但书,龙舌兰", run_order_buffer_time=20) + plan = { + # 阶段 1 + "default_plan": Plan(plan_config, agent_base_config), + "backup_plans": [Plan(backup_plan1_config, agent_base_config0, + trigger=LogicExpression("op_data.operators['令'].current_room.startswith('dorm')", + "and", + LogicExpression( + "op_data.operators['温蒂'].current_mood() - op_data.operators['承曦格雷伊'].current_mood()", + ">", "4")), + task={'dormitory_2': ['Current', 'Current', 'Current', 'Current', '承曦格雷伊']})] + } + + solver = BaseSchedulerSolver() + solver.global_plan = plan + solver.initialize_operators() + solver.tasks = [] + with patch.object(BaseSchedulerSolver, 'agent_get_mood') as mock_agent_get_mood: + mock_agent_get_mood.return_value = None + solver.op_data.operators['令'].current_room = 'dorm' + solver.op_data.operators['温蒂'].mood = 12 + solver.op_data.operators['承曦格雷伊'].mood = 7 + solver.backup_plan_solver() + self.assertEqual(len(solver.tasks), 1) + solver.op_data.operators['承曦格雷伊'].mood = 12 + solver.backup_plan_solver() + self.assertEqual(solver.op_data.plan_name, "default_plan") + + +if __name__ == '__main__': + unittest.main() diff --git a/arknights_mower/utils/operators.py b/arknights_mower/utils/operators.py index fa4cd7270..100f646de 100644 --- a/arknights_mower/utils/operators.py +++ b/arknights_mower/utils/operators.py @@ -35,6 +35,7 @@ def __init__(self, plan): self.run_order_rooms = {} self.clues = [] self.current_room_changed_callback = None + self.party_time = None def __repr__(self): return f'Operators(operators={self.operators})' diff --git a/arknights_mower/utils/paddleocr.py b/arknights_mower/utils/paddleocr.py deleted file mode 100644 index c129e21a5..000000000 --- a/arknights_mower/utils/paddleocr.py +++ /dev/null @@ -1,21 +0,0 @@ -from .path import get_path - -ocr = None - -def initialize_ocr(): - global ocr - if not ocr: - from paddleocr import PaddleOCR - - det_model_dir = get_path("@app/tmp/paddle/det/ch", space='') - rec_model_dir = get_path("@app/tmp/paddle/rec/ch", space='') - cls_model_dir = get_path("@app/tmp/paddle/cls", space='') - ocr = PaddleOCR( - enable_mkldnn=False, - use_angle_cls=False, - cls=False, - show_log=False, - det_model_dir=str(det_model_dir), - rec_model_dir=str(rec_model_dir), - cls_model_dir=str(cls_model_dir), - ) diff --git a/arknights_mower/utils/recognize.py b/arknights_mower/utils/recognize.py index fe10999ab..78a552ff8 100644 --- a/arknights_mower/utils/recognize.py +++ b/arknights_mower/utils/recognize.py @@ -64,7 +64,7 @@ def get_scene(self) -> int: """ get the current scene in the game """ if self.scene != Scene.UNDEFINED: return self.scene - if self.find('connecting', scope=((self.w//2, self.h//10*8), (self.w//4*3, self.h)), score=0.15) is not None: + if self.find('connecting', scope=((self.w//2, self.h//10*8), (self.w//4*3, self.h)), score=0.2) is not None: self.scene = Scene.CONNECTING elif self.find('index_nav', thres=250, scope=((0, 0), (100+self.w//4, self.h//10))) is not None: self.scene = Scene.INDEX @@ -244,7 +244,7 @@ def get_scene(self) -> int: def get_infra_scene(self)-> int: if self.scene != Scene.UNDEFINED: return self.scene - if self.find('connecting', scope=((self.w//2, self.h//10*8), (self.w//4*3, self.h)), score=0.15) is not None: + if self.find('connecting', scope=((self.w//2, self.h//10*8), (self.w//4*3, self.h)), score=0.2) is not None: self.scene = Scene.CONNECTING elif self.find('double_confirm') is not None: if self.find('network_check') is not None: diff --git a/diy.py b/diy.py index 659be14ac..9277f566e 100644 --- a/diy.py +++ b/diy.py @@ -3,18 +3,15 @@ import atexit import json import os - from arknights_mower.solvers.base_schedule import BaseSchedulerSolver from arknights_mower.strategy import Solver from arknights_mower.utils.device import Device from arknights_mower.utils.email import task_template from arknights_mower.utils.log import logger, init_fhlr -from arknights_mower.utils import config -from arknights_mower.utils.logic_expression import LogicExpression +from arknights_mower.utils import config, rapidocr from arknights_mower.utils.simulator import restart_simulator from arknights_mower.utils.plan import Plan, PlanConfig, Room -import arknights_mower.utils.paddleocr - +from arknights_mower.utils.logic_expression import LogicExpression # 下面不能删除 from arknights_mower.utils.operators import Operators, Operator, Dormitory from arknights_mower.utils.scheduler_task import SchedulerTask,TaskTypes @@ -380,7 +377,7 @@ def simulate(): continue base_scheduler.run() reconnect_tries = 0 - except ConnectionError or ConnectionAbortedError or AttributeError as e: + except (ConnectionError, ConnectionAbortedError, AttributeError) as e: reconnect_tries += 1 if reconnect_tries < reconnect_max_tries: logger.warning(f'连接端口断开....正在重连....') @@ -389,7 +386,7 @@ def simulate(): try: base_scheduler = inialize([], base_scheduler) break - except RuntimeError or ConnectionError or ConnectionAbortedError as ce: + except (ConnectionError, ConnectionAbortedError, AttributeError) as ce: logger.error(ce) restart_simulator(simulator) continue @@ -411,5 +408,5 @@ def simulate(): # debuglog() atexit.register(save_state) savelog() -arknights_mower.utils.paddleocr.initialize_ocr() +rapidocr.initialize_ocr() simulate() diff --git a/ui/src/pages/Plan.vue b/ui/src/pages/Plan.vue index f479ae1a7..a72fab47b 100644 --- a/ui/src/pages/Plan.vue +++ b/ui/src/pages/Plan.vue @@ -108,10 +108,10 @@ async function save() {