diff --git a/.gitignore b/.gitignore index 923085353..4739c3311 100644 --- a/.gitignore +++ b/.gitignore @@ -9,7 +9,6 @@ screenshot/* screenshot_right/* test/* test_* -test.py release.sh # Windows @@ -17,6 +16,11 @@ venv64 release_test.bat publish/* +# Myself +test.py +cron.py +cron.json + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/arknights_mower/__init__.py b/arknights_mower/__init__.py index f5ce54eaa..80d84e8e8 100644 --- a/arknights_mower/__init__.py +++ b/arknights_mower/__init__.py @@ -15,4 +15,4 @@ __cli__ = not (__pyinstall__ and not sys.argv[1:]) __system__ = platform.system().lower() -__version__ = '2.0.0a7' +__version__ = '2.0.0a8' diff --git a/arknights_mower/command.py b/arknights_mower/command.py index 958a00a72..751654ca8 100644 --- a/arknights_mower/command.py +++ b/arknights_mower/command.py @@ -8,6 +8,7 @@ from .utils import config from .utils.log import logger from .utils.device import Device +from .utils.operation import parse_operation_params class ParamError(ValueError): @@ -110,35 +111,7 @@ def operation(args: list[str] = [], device: Device = None): config.update_ope_plan(remain_plan) return - level = None - times = -1 - potion = 0 - originite = 0 - eliminate = False - - try: - for p in args: - if p[0] == '-': - val = -1 - if len(p) > 2: - val = int(p[2:]) - if p[1] == 'r': - assert potion == 0 - potion = val - elif p[1] == 'R': - assert originite == 0 - originite = val - elif p[1] == 'e': - assert not eliminate - eliminate = True - elif p.find('-') == -1: - assert times == -1 - times = int(p) - else: - assert level is None - level = p - except Exception: - raise ParamError + level, times, potion, originite, eliminate = parse_operation_params(args) OpeSolver(device).run(level, times, potion, originite, eliminate) @@ -169,27 +142,17 @@ def help(args: list[str] = [], device: Device = None): print(f' --config filepath\n 指定配置文件,默认使用 {config.PATH}') -def schedule(args: list[str] = [], device: Device = None): - """ - schedule - 执行配置文件中的计划任务 - """ - if config.SCHEDULE_PLAN is not None: - sd.every().hour.do(task, tag='per_hour', device=device) - for tag in config.SCHEDULE_PLAN.keys(): - if tag[:4] == 'day_': - sd.every().day.at(tag.replace('_', ':')[4:]).do( - task, tag=tag, device=device) - task(device=device) - while True: - sd.run_pending() - time.sleep(60) - else: - logger.warning('empty plan') +""" +commands for schedule +operation will be replaced by operation_one in ScheduleSolver +""" +schedule_cmds = [base, credit, mail, mission, shop, recruit, operation] -def task(tag: str = 'start_up', device: Device = None): - """ run single task """ +def add_tasks(solver: ScheduleSolver = None, tag: str = ''): + """ + 为 schedule 模块添加任务 + """ plan = config.SCHEDULE_PLAN.get(tag) if plan is not None: for args in plan: @@ -199,13 +162,41 @@ def task(tag: str = 'start_up', device: Device = None): 'Found `schedule` in `schedule`. Are you kidding me?') raise NotImplementedError try: - target_cmd = match_cmd(args[0]) + target_cmd = match_cmd(args[0], schedule_cmds) if target_cmd is not None: - target_cmd(args[1:], device) + solver.add_task(tag, target_cmd, args[1:]) except Exception as e: logger.error(e) +def schedule(args: list[str] = [], device: Device = None): + """ + schedule + 执行配置文件中的计划任务 + 计划执行时会自动存档至本地磁盘,启动时若检测到有存档,则会使用存档内容继续完成计划 + -n 忽略之前中断的计划任务,按照配置文件重新开始新的计划 + """ + new_schedule = False + + try: + for p in args: + if p[0] == '-': + if p[1] == 'n': + new_schedule = True + except Exception: + raise ParamError + + solver = ScheduleSolver(device) + if new_schedule or solver.load_from_disk(schedule_cmds, match_cmd) is False: + if config.SCHEDULE_PLAN is not None: + for tag in config.SCHEDULE_PLAN.keys(): + add_tasks(solver, tag) + else: + logger.warning('empty plan') + solver.per_run() + solver.run() + + # all available commands global_cmds = [base, credit, mail, mission, shop, recruit, operation, version, help, schedule] diff --git a/arknights_mower/solvers/__init__.py b/arknights_mower/solvers/__init__.py index 2f79fcf7c..b503069e8 100644 --- a/arknights_mower/solvers/__init__.py +++ b/arknights_mower/solvers/__init__.py @@ -5,3 +5,4 @@ from .operation import OpeSolver from .recruit import RecruitSolver from .shop import ShopSolver +from .schedule import ScheduleSolver diff --git a/arknights_mower/solvers/base_construct.py b/arknights_mower/solvers/base_construct.py index 90d485819..a46ba7d4e 100644 --- a/arknights_mower/solvers/base_construct.py +++ b/arknights_mower/solvers/base_construct.py @@ -118,7 +118,7 @@ def clue(self) -> None: self.find('clue_summary') and self.back() # 识别右侧按钮 - (x0, y0), (x1, y1) = self.find('clue_func') + (x0, y0), (x1, y1) = self.find('clue_func', strict=True) logger.info('接收赠送线索') self.tap(((x0+x1)//2, (y0*3+y1)//4), interval=3, rebuild=False) @@ -203,7 +203,7 @@ def recog_bar(self) -> None: """ 识别阵营选择栏 """ global x1, x2, y0, y1 - (x1, y0), (x2, y1) = self.find('clue_nav') + (x1, y0), (x2, y1) = self.find('clue_nav', strict=True) while int(self.recog.img[y0, x1-1].max()) - int(self.recog.img[y0, x1].max()) <= 1: x1 -= 1 while int(self.recog.img[y0, x2].max()) - int(self.recog.img[y0, x2-1].max()) <= 1: @@ -328,7 +328,7 @@ def enter_room(self, room: str) -> tp.Rectangle: """ 获取房间的位置并进入 """ # 获取基建各个房间的位置 - base_room = segment.base(self.recog.img, self.find('control_central')) + base_room = segment.base(self.recog.img, self.find('control_central', strict=True)) # 将画面外的部分删去 room = base_room[room] diff --git a/arknights_mower/solvers/credit.py b/arknights_mower/solvers/credit.py index 0fd3d7f07..a189f69c0 100644 --- a/arknights_mower/solvers/credit.py +++ b/arknights_mower/solvers/credit.py @@ -23,7 +23,7 @@ def transition(self) -> bool: elif self.scene() == Scene.FRIEND_LIST_OFF: self.tap_element('friend_list') elif self.scene() == Scene.FRIEND_LIST_ON: - down = self.find('friend_list_on')[1][1] + down = self.find('friend_list_on', strict=True)[1][1] scope = [(0, 0), (100000, down)] if not self.tap_element('friend_visit', scope=scope, detected=True): self.sleep(1) diff --git a/arknights_mower/solvers/operation.py b/arknights_mower/solvers/operation.py index 0d947db00..4dc39fac0 100644 --- a/arknights_mower/solvers/operation.py +++ b/arknights_mower/solvers/operation.py @@ -406,7 +406,7 @@ def choose_zone_supple(self, zone: list, scope: tp.Scope) -> None: def choose_zone_resource(self, zone: list) -> None: """ 识别资源收集区域 """ ocr = ocrhandle.predict(self.recog.img) - unable = list(filter(lambda x: x[1] == '不可进入', ocr)) + unable = list(filter(lambda x: x[1] in ['不可进入', '关卡尚未开放'], ocr)) ocr = list(filter(lambda x: x[1] in weekly_zones, ocr)) weekly = sorted([x[1] for x in ocr]) while zone[0] not in weekly: @@ -414,7 +414,7 @@ def choose_zone_resource(self, zone: list) -> None: self.swipe((self.recog.w // 4, self.recog.h // 4), (self.recog.w // 16, 0)) ocr = ocrhandle.predict(self.recog.img) - unable = list(filter(lambda x: x[1] == '不可进入', ocr)) + unable = list(filter(lambda x: x[1] in ['不可进入', '关卡尚未开放'], ocr)) ocr = list(filter(lambda x: x[1] in weekly_zones, ocr)) weekly = sorted([x[1] for x in ocr]) if _weekly == weekly: @@ -424,7 +424,7 @@ def choose_zone_resource(self, zone: list) -> None: self.swipe((self.recog.w // 4, self.recog.h // 4), (-self.recog.w // 16, 0)) ocr = ocrhandle.predict(self.recog.img) - unable = list(filter(lambda x: x[1] == '不可进入', ocr)) + unable = list(filter(lambda x: x[1] in ['不可进入', '关卡尚未开放'], ocr)) ocr = list(filter(lambda x: x[1] in weekly_zones, ocr)) weekly = sorted([x[1] for x in ocr]) if _weekly == weekly: diff --git a/arknights_mower/solvers/schedule.py b/arknights_mower/solvers/schedule.py new file mode 100644 index 000000000..02e9d9f82 --- /dev/null +++ b/arknights_mower/solvers/schedule.py @@ -0,0 +1,254 @@ +import time +import datetime + +from pathlib import Path +from functools import cmp_to_key +from collections.abc import Callable +from ruamel.yaml import yaml_object + +from ..utils import config +from ..utils.device import Device +from ..utils.log import logger +from ..utils.solver import BaseSolver +from ..utils.priority_queue import PriorityQueue +from ..utils.datetime import the_same_day +from ..utils.operation import operation_times +from ..utils.recognize import Recognizer +from ..utils.operation import parse_operation_params +from ..utils.yaml import yaml +from .operation import OpeSolver + +task_priority = {'base': 0, 'recruit': 1, 'mail': 2, 'credit': 3, 'shop': 4, 'mission': 5, 'operation': 6} + + +class ScheduleLogError(ValueError): + """ Schedule log 文件解析错误 """ + + +def operation_one(args: list[str] = [], device: Device = None) -> bool: + """ + 只为 schedule 模块使用的单次作战操作 + 目前不支持使用源石和体力药 + + 返回值表示该次作战是否成功 + 完成剿灭不算成功 + """ + level, _, _, _, eliminate = parse_operation_params(args) + remain_plan = OpeSolver(device).run(level, 1, 0, 0, eliminate) + for plan in remain_plan: + if plan[1] != 0: + return False + return True + + +@yaml_object(yaml) +class Task(object): + """ + 单个任务 + """ + + def __init__(self, tag: str = '', cmd: Callable = None, args: list[str] = [], device: Device = None): + self.cmd = cmd + self.cmd_args = args + self.tag = tag + self.last_run = None + self.idx = None + self.pending = False + self.total = 1 + self.finish = 0 + self.device = device + + # per_hour 任务的第一次执行将在启动脚本后的一个小时之后 + if tag == 'per_hour': + self.last_run = datetime.datetime.now() + if cmd.__name__ == 'operation': + self.total = operation_times(args) + assert self.total != 0 + + @classmethod + def to_yaml(cls, representer, data): + last_run = '' + if data.last_run is not None: + last_run = data.last_run.strftime('%Y-%m-%d %H:%M:%S') + return representer.represent_mapping('task', + {'tag': data.tag, + 'cmd': data.cmd.__name__, + 'cmd_args': data.cmd_args, + 'last_run': last_run, + 'idx': data.idx, + 'pending': data.pending, + 'total': data.total, + 'finish': data.finish}) + + def __lt__(self, other): + if task_priority[self.cmd.__name__] != task_priority[other.cmd.__name__]: + return task_priority[self.cmd.__name__] < task_priority[other.cmd.__name__] + return self.idx < other.idx + + def load(self, last_run: str = '', idx: int = 0, pending: bool = False, total: int = 1, finish: int = 0): + if last_run == '': + self.last_run = None + else: + self.last_run = datetime.datetime.strptime(last_run, '%Y-%m-%d %H:%M:%S') + self.idx = idx + self.pending = pending + self.total = total + self.finish = finish + + def reset(self): + if tag != 'per_hour': + self.last_run = None + self.pending = False + self.finish = 0 + + def set_idx(self, idx: int = None): + self.idx = idx + + def start_up(self) -> bool: + return self.tag == 'start_up' + + def need_run(self, now: datetime.datetime = datetime.datetime.now()) -> bool: + if self.pending: + return False + if self.start_up(): + if self.last_run is not None: + return False + self.pending = True + self.last_run = now + return True + if self.tag[:4] == 'day_': + # 同一天 and 跑过了 + if self.last_run is not None and the_same_day(now, self.last_run): + return False + # 还没到时间 + if now.strftime('%H:%M') < self.tag.replace('_', ':')[4:]: + return False + self.pending = True + self.last_run = now + return True + if self.tag == 'per_hour': + if self.last_run + datetime.timedelta(hours=1) <= now: + self.pending = True + self.last_run = now + return True + return False + return False + + def run(self) -> bool: + logger.info(f'task: {self.cmd.__name__} {self.cmd_args}') + if self.cmd.__name__ == 'operation': + if operation_one(self.cmd_args, self.device): + self.finish += 1 + if self.finish == self.total: + self.finish = 0 + self.pending = False + return True + return False + self.cmd(self.cmd_args, self.device) + self.pending = False + return True + + +def cmp_for_init(task_a: Task = None, task_b: Task = None) -> int: + if task_a.start_up() and task_b.start_up(): + return 0 + + if task_a.start_up(): + return -1 + + if task_b.start_up(): + return 1 + return 0 + + +@yaml_object(yaml) +class ScheduleSolver(BaseSolver): + """ + 按照计划定时、自动完成任务 + """ + + def __init__(self, device: Device = None, recog: Recognizer = None) -> None: + super().__init__(device, recog) + self.tasks = [] + self.pending_list = PriorityQueue() + self.device = device + self.last_run = None + self.schedule_log_path = Path(config.LOGFILE_PATH).joinpath('schedule.log') + + @classmethod + def to_yaml(cls, representer, data): + return representer.represent_mapping('Schedule', {'last_run': data.last_run.strftime('%Y-%m-%d %H:%M:%S'), + 'tasks': data.tasks}) + + def dump_to_disk(self): + with self.schedule_log_path.open('w', encoding='utf8') as f: + yaml.dump(self, f) + logger.info('计划已存档') + + def load_from_disk(self, cmd_list: list[Callable] = [], matcher: Callable = None) -> bool: + try: + with self.schedule_log_path.open('r', encoding='utf8') as f: + data = yaml.load(f) + self.last_run = datetime.datetime.strptime(data['last_run'], '%Y-%m-%d %H:%M:%S') + for task in data['tasks']: + cmd = matcher(task['cmd'], cmd_list) + if cmd is None: + raise ScheduleLogError + new_task = Task( + task['tag'], cmd, task['cmd_args'], self.device + ) + new_task.load( + task['last_run'], task['idx'], task['pending'], task['total'], task['finish'] + ) + self.tasks.append(new_task) + if new_task.pending: + self.pending_list.push(new_task) + except Exception: + return False + logger.info('发现中断的计划,将继续执行') + return True + + def add_task(self, tag: str = '', cmd: Callable = None, args: list[str] = []): + task = Task(tag, cmd, args, self.device) + self.tasks.append(task) + + def per_run(self): + """ + 这里是为了处理优先级相同的情况,对于优先级相同时,我们依次考虑: + 1. start_up 优先执行 + 2. 按照配置文件的顺序决定先后顺序 + + sort 是稳定排序,详见: + https://docs.python.org/3/library/functions.html#sorted + """ + self.tasks.sort(key=cmp_to_key(cmp_for_init)) + for idx, task in enumerate(self.tasks): + task.set_idx(idx) + + def run(self): + logger.info('Start: 计划') + + super().run() + + def new_day(self): + for task in self.tasks: + task.reset() + self.pending_list = PriorityQueue() + + def transition(self) -> None: + while True: + now = datetime.datetime.now() + if self.last_run is not None and the_same_day(self.last_run, now) is False: + self.new_day() + self.last_run = now + for task in self.tasks: + if task.need_run(now): + self.pending_list.push(task) + + task = self.pending_list.pop() + if task is not None: + if task.run() is False: + self.pending_list.push(task) + + self.dump_to_disk() + time.sleep(60) diff --git a/arknights_mower/templates/config.yaml b/arknights_mower/templates/config.yaml index 0f63ef37f..cd265d313 100644 --- a/arknights_mower/templates/config.yaml +++ b/arknights_mower/templates/config.yaml @@ -70,9 +70,10 @@ schedule: - mail # 自动收取邮件 - base -c # 自动收取基建并使用线索 - credit # 自动收取信用点 + # 目前 schedule 模块中的作战暂不支持使用理智药和源石 - operation -e # 自动执行上一个作战直到体力耗尽,优先处理未完成的每周剿灭 - # - operation -r5 AP-5 # 自动执行 AP-5 作战,当体力不足时会使用理智药,上限为 5 瓶 - # - operation 1-7 99 -r5 -R5 # 重复刷 1-7 关卡 99 次,使用理智药以及源石自动回复理智,最多消耗 5 瓶理智药和 5 颗源石 + # - operation AP-5 # 自动执行 AP-5 作战 + # - operation 1-7 99 # 重复刷 1-7 关卡 99 次 - shop # 自动购买信用点商店的内容,购物的优先级以下文的 priority 部分作为标准 - recruit # 自动进行公招,招募的优先级以下文的 priority 部分作为标准 - mission # 自动完成每日/每周任务 @@ -87,7 +88,7 @@ schedule: day_15_00: *full_task day_23_00: *full_task - # 自动换班的相关任务,每天定时执行(当前换班功能不稳定,如有尝试的需求请自行去掉注释符号) + # 自动换班的相关任务,每天定时执行 # day_08_00: # - base plan_1 # day_14_00: diff --git a/arknights_mower/utils/config.py b/arknights_mower/utils/config.py index bed9faef1..0f59762c1 100644 --- a/arknights_mower/utils/config.py +++ b/arknights_mower/utils/config.py @@ -2,7 +2,6 @@ import sys import shutil -import ruamel.yaml from typing import Any from pathlib import Path from collections import Mapping @@ -10,8 +9,8 @@ from . import typealias as tp from .. import __rootdir__, __system__, __pyinstall__ +from .yaml import yaml -yaml = ruamel.yaml.YAML() __ydoc = None BASE_CONSTRUCT_PLAN: dict[str, tp.BasePlan] diff --git a/arknights_mower/utils/datetime.py b/arknights_mower/utils/datetime.py new file mode 100644 index 000000000..d19001044 --- /dev/null +++ b/arknights_mower/utils/datetime.py @@ -0,0 +1,7 @@ +import datetime + + +def the_same_day(a: datetime.datetime = None, b: datetime.datetime = None) -> bool: + if a is None or b is None: + return False + return a.year == b.year and a.month == b.month and a.day == b.day diff --git a/arknights_mower/utils/device/device.py b/arknights_mower/utils/device/device.py index 43520fa31..f4de9eebc 100644 --- a/arknights_mower/utils/device/device.py +++ b/arknights_mower/utils/device/device.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time from typing import Optional from .client import Client @@ -79,3 +80,10 @@ def swipe(self, points: list[tuple[int, int]], duration: int = 100, part: int = points_num = len(points) duration //= points_num - 1 self.minitouch.smooth_swipe(points, self.display_frames(), duration=duration, part=part, fall=fall, lift=lift) + + def check_current_focus(self): + """ check if the application is in the foreground """ + if self.current_focus() != config.APPNAME: + self.launch(config.APPNAME) + # wait for app to finish launching + time.sleep(10) diff --git a/arknights_mower/utils/operation.py b/arknights_mower/utils/operation.py new file mode 100644 index 000000000..adf1a55d6 --- /dev/null +++ b/arknights_mower/utils/operation.py @@ -0,0 +1,37 @@ +def parse_operation_params(args: list[str] = []) -> [str, int, int, int, bool]: + level = None + times = -1 + potion = 0 + originite = 0 + eliminate = False + + try: + for p in args: + if p[0] == '-': + val = -1 + if len(p) > 2: + val = int(p[2:]) + if p[1] == 'r': + assert potion == 0 + potion = val + elif p[1] == 'R': + assert originite == 0 + originite = val + elif p[1] == 'e': + assert not eliminate + eliminate = True + elif p.find('-') == -1: + assert times == -1 + times = int(p) + else: + assert level is None + level = p + except Exception: + raise ParamError + return [level, times, potion, originite, eliminate] + + +def operation_times(args: list[str] = []) -> int: + _, times, _, _, _ = parse_operation_params(args) + return times + diff --git a/arknights_mower/utils/priority_queue.py b/arknights_mower/utils/priority_queue.py new file mode 100644 index 000000000..55f48697b --- /dev/null +++ b/arknights_mower/utils/priority_queue.py @@ -0,0 +1,18 @@ +import heapq + + +class PriorityQueue(object): + """ + 基于 heapq 实现的优先队列 + """ + + def __init__(self): + self.queue = [] + + def push(self, data): + heapq.heappush(self.queue, data) + + def pop(self): + if len(self.queue) == 0: + return None + return heapq.heappop(self.queue) diff --git a/arknights_mower/utils/recognize.py b/arknights_mower/utils/recognize.py index 06774539a..610a5df29 100644 --- a/arknights_mower/utils/recognize.py +++ b/arknights_mower/utils/recognize.py @@ -171,6 +171,7 @@ def get_scene(self) -> int: self.scene = Scene.LOADING else: self.scene = Scene.UNKNOWN + self.device.check_current_focus() # save screencap to analyse if config.SCREENSHOT_PATH is not None: save_screenshot( @@ -186,7 +187,7 @@ def nav_button(self): """ find navigation button """ return self.find('nav_button', thres=128, scope=((0, 0), (100+self.w//4, self.h//10))) - def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int = None, judge: bool = True) -> tp.Scope: + def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int = None, judge: bool = True, strict: bool = False) -> tp.Scope: """ 查找元素是否出现在画面中 @@ -194,7 +195,8 @@ def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int :param draw: 是否将识别结果输出到屏幕 :param scope: ((x0, y0), (x1, y1)),提前限定元素可能出现的范围 :param thres: 是否在匹配前对图像进行二值化处理 - :param judge: 是否假如更加精准的判断 + :param judge: 是否加入更加精确的判断 + :param strict: 是否启用严格模式,未找到时报错 :return ret: 若匹配成功,则返回元素在游戏界面中出现的位置,否则返回 None """ @@ -211,6 +213,8 @@ def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int res_img = loadimg(res, True) matcher = self.matcher ret = matcher.match(res_img, draw=draw, scope=scope, judge=judge) + if strict and ret is None: + raise RecognizeError(f"Can't find '{res}'") return ret # def score(self, item, draw=False, scope=None, thres=None): diff --git a/arknights_mower/utils/solver.py b/arknights_mower/utils/solver.py index 59e73fa3b..5a4e6762a 100644 --- a/arknights_mower/utils/solver.py +++ b/arknights_mower/utils/solver.py @@ -26,10 +26,7 @@ def __init__(self, device: Device = None, recog: Recognizer = None) -> None: raise RuntimeError self.device = device if device is not None else Device() self.recog = recog if recog is not None else Recognizer(self.device) - if self.device.current_focus() != config.APPNAME: - self.device.launch(config.APPNAME) - # wait for app to finish launching - time.sleep(10) + self.device.check_current_focus() def run(self) -> None: retry_times = config.MAX_RETRYTIME @@ -90,8 +87,8 @@ def input(self, referent: str, input_area: tp.Scope) -> None: self.device.send_text(input(referent).strip()) self.device.tap((0, 0)) - def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int = None, judge: bool = True) -> tp.Scope: - return self.recog.find(res, draw, scope, thres, judge) + def find(self, res: str, draw: bool = False, scope: tp.Scope = None, thres: int = None, judge: bool = True, strict: bool = False) -> tp.Scope: + return self.recog.find(res, draw, scope, thres, judge, strict) def tap(self, poly: tp.Location, x_rate: float = 0.5, y_rate: float = 0.5, interval: float = 1, rebuild: bool = True) -> None: """ tap """ diff --git a/arknights_mower/utils/yaml.py b/arknights_mower/utils/yaml.py new file mode 100644 index 000000000..254a20547 --- /dev/null +++ b/arknights_mower/utils/yaml.py @@ -0,0 +1,3 @@ +import ruamel.yaml + +yaml = ruamel.yaml.YAML()