diff --git a/conversation.py b/conversation.py index 6f843a0e5..bc1f352a2 100644 --- a/conversation.py +++ b/conversation.py @@ -5,6 +5,7 @@ from engine_wrapper import EngineWrapper from lichess import Lichess from collections.abc import Sequence +from timer import seconds MULTIPROCESSING_LIST_TYPE = Sequence[model.Challenge] logger = logging.getLogger(__name__) @@ -53,7 +54,7 @@ def command(self, line: ChatLine, cmd: str) -> None: if cmd == "commands" or cmd == "help": self.send_reply(line, "Supported commands: !wait (wait a minute for my first move), !name, !howto, !eval, !queue") elif cmd == "wait" and self.game.is_abortable(): - self.game.ping(60, 120, 120) + self.game.ping(seconds(60), seconds(120), seconds(120)) self.send_reply(line, "Waiting 60 seconds...") elif cmd == "name": name = self.game.me.name diff --git a/engine_wrapper.py b/engine_wrapper.py index e0ac5f888..d495d990c 100644 --- a/engine_wrapper.py +++ b/engine_wrapper.py @@ -8,6 +8,7 @@ import chess import subprocess import logging +import datetime import time import random from collections import Counter @@ -17,6 +18,7 @@ import model import lichess from config import Configuration +from timer import Timer, msec, seconds, msec_str, sec_str, to_seconds from typing import Any, Optional, Union OPTIONS_TYPE = dict[str, Any] MOVE_INFO_TYPE = dict[str, Any] @@ -102,13 +104,13 @@ def play_move(self, board: chess.Board, game: model.Game, li: lichess.Lichess, - start_time: int, - move_overhead: int, + setup_timer: Timer, + move_overhead: datetime.timedelta, can_ponder: bool, is_correspondence: bool, - correspondence_move_time: int, + correspondence_move_time: datetime.timedelta, engine_cfg: config.Configuration, - min_time: float) -> None: + min_time: datetime.timedelta) -> None: """ Play a move. @@ -152,16 +154,16 @@ def play_move(self, time_limit = first_move_time(game) can_ponder = False # No pondering after the first move since a new clock starts afterwards. elif is_correspondence: - time_limit = single_move_time(board, game, correspondence_move_time, start_time, move_overhead) + time_limit = single_move_time(board, game, correspondence_move_time, setup_timer, move_overhead) else: - time_limit = game_clock_time(board, game, start_time, move_overhead) + time_limit = game_clock_time(board, game, setup_timer, move_overhead) best_move = self.search(board, time_limit, can_ponder, draw_offered, best_move) # Heed min_time - elapsed = (time.perf_counter_ns() - start_time) / 1e9 + elapsed = setup_timer.time_since_reset() if elapsed < min_time: - time.sleep(min_time - elapsed) + time.sleep(to_seconds(min_time - elapsed)) self.add_comment(best_move, board) self.print_stats() @@ -172,11 +174,11 @@ def play_move(self, def add_go_commands(self, time_limit: chess.engine.Limit) -> chess.engine.Limit: """Add extra commands to send to the engine. For example, to search for 1000 nodes or up to depth 10.""" - movetime = self.go_commands.movetime - if movetime is not None: - movetime_sec = float(movetime) / 1000 - if time_limit.time is None or time_limit.time > movetime_sec: - time_limit.time = movetime_sec + movetime_cfg = self.go_commands.movetime + if movetime_cfg is not None: + movetime = msec(movetime_cfg) + if time_limit.time is None or seconds(time_limit.time) > movetime: + time_limit.time = to_seconds(movetime) time_limit.depth = self.go_commands.depth time_limit.nodes = self.go_commands.nodes return time_limit @@ -576,8 +578,8 @@ def getHomemadeEngine(name: str) -> type[MinimalEngine]: return engine -def single_move_time(board: chess.Board, game: model.Game, search_time: int, - start_time: int, move_overhead: int) -> chess.engine.Limit: +def single_move_time(board: chess.Board, game: model.Game, search_time: datetime.timedelta, + setup_timer: Timer, move_overhead: datetime.timedelta) -> chess.engine.Limit: """ Calculate time to search in correspondence games. @@ -588,13 +590,13 @@ def single_move_time(board: chess.Board, game: model.Game, search_time: int, :param move_overhead: The time it takes to communicate between the engine and lichess-bot. :return: The time to choose a move. """ - pre_move_time = int((time.perf_counter_ns() - start_time) / 1e6) + pre_move_time = setup_timer.time_since_reset() overhead = pre_move_time + move_overhead wb = "w" if board.turn == chess.WHITE else "b" - clock_time = max(0, game.state[f"{wb}time"] - overhead) + clock_time = max(msec(0), msec(game.state[f"{wb}time"]) - overhead) search_time = min(search_time, clock_time) - logger.info(f"Searching for time {search_time} for game {game.id}") - return chess.engine.Limit(time=search_time / 1000, clock_id="correspondence") + logger.info(f"Searching for time {sec_str(search_time)} seconds for game {game.id}") + return chess.engine.Limit(time=to_seconds(search_time), clock_id="correspondence") def first_move_time(game: model.Game) -> chess.engine.Limit: @@ -604,13 +606,16 @@ def first_move_time(game: model.Game) -> chess.engine.Limit: :param game: The game that the bot is playing. :return: The time to choose the first move. """ - # Need to hardcode first movetime (10000 ms) since Lichess has 30 sec limit. - search_time = 10000 - logger.info(f"Searching for time {search_time} for game {game.id}") - return chess.engine.Limit(time=search_time / 1000, clock_id="first move") + # Need to hardcode first movetime since Lichess has 30 sec limit. + search_time = seconds(10) + logger.info(f"Searching for time {sec_str(search_time)} seconds for game {game.id}") + return chess.engine.Limit(time=to_seconds(search_time), clock_id="first move") -def game_clock_time(board: chess.Board, game: model.Game, start_time: int, move_overhead: int) -> chess.engine.Limit: +def game_clock_time(board: chess.Board, + game: model.Game, + setup_timer: Timer, + move_overhead: datetime.timedelta) -> chess.engine.Limit: """ Get the time to play by the engine in realtime games. @@ -620,15 +625,16 @@ def game_clock_time(board: chess.Board, game: model.Game, start_time: int, move_ :param move_overhead: The time it takes to communicate between the engine and lichess-bot. :return: The time to play a move. """ - pre_move_time = int((time.perf_counter_ns() - start_time) / 1e6) + pre_move_time = setup_timer.time_since_reset() overhead = pre_move_time + move_overhead + times = {side: msec(game.state[side]) for side in ["wtime", "btime"]} wb = "w" if board.turn == chess.WHITE else "b" - game.state[f"{wb}time"] = max(0, game.state[f"{wb}time"] - overhead) - logger.info("Searching for wtime {wtime} btime {btime}".format_map(game.state) + f" for game {game.id}") - return chess.engine.Limit(white_clock=game.state["wtime"] / 1000, - black_clock=game.state["btime"] / 1000, - white_inc=game.state["winc"] / 1000, - black_inc=game.state["binc"] / 1000, + times[f"{wb}time"] = max(msec(0), times[f"{wb}time"] - overhead) + logger.info(f"Searching for wtime {msec_str(times['wtime'])} btime {msec_str(times['btime'])} for game {game.id}") + return chess.engine.Limit(white_clock=to_seconds(times["wtime"]), + black_clock=to_seconds(times["btime"]), + white_inc=to_seconds(msec(game.state["winc"])), + black_inc=to_seconds(msec(game.state["binc"])), clock_id="real time") @@ -733,8 +739,8 @@ def get_chessdb_move(li: lichess.Lichess, board: chess.Board, game: model.Game, """Get a move from chessdb.cn's opening book.""" wb = "w" if board.turn == chess.WHITE else "b" use_chessdb = chessdb_cfg.enabled - time_left = game.state[f"{wb}time"] - min_time = chessdb_cfg.min_time * 1000 + time_left = msec(game.state[f"{wb}time"]) + min_time = seconds(chessdb_cfg.min_time) if not use_chessdb or time_left < min_time or board.uci_variant != "chess": return None, None @@ -774,8 +780,8 @@ def get_lichess_cloud_move(li: lichess.Lichess, board: chess.Board, game: model. lichess_cloud_cfg: config.Configuration) -> tuple[Optional[str], Optional[chess.engine.InfoDict]]: """Get a move from the lichess's cloud analysis.""" wb = "w" if board.turn == chess.WHITE else "b" - time_left = game.state[f"{wb}time"] - min_time = lichess_cloud_cfg.min_time * 1000 + time_left = msec(game.state[f"{wb}time"]) + min_time = seconds(lichess_cloud_cfg.min_time) use_lichess_cloud = lichess_cloud_cfg.enabled if not use_lichess_cloud or time_left < min_time: return None, None @@ -829,8 +835,8 @@ def get_opening_explorer_move(li: lichess.Lichess, board: chess.Board, game: mod ) -> tuple[Optional[str], Optional[chess.engine.InfoDict]]: """Get a move from lichess's opening explorer.""" wb = "w" if board.turn == chess.WHITE else "b" - time_left = game.state[f"{wb}time"] - min_time = opening_explorer_cfg.min_time * 1000 + time_left = msec(game.state[f"{wb}time"]) + min_time = seconds(opening_explorer_cfg.min_time) source = opening_explorer_cfg.source if not opening_explorer_cfg.enabled or time_left < min_time or source == "master" and board.uci_variant != "chess": return None, None @@ -885,9 +891,9 @@ def get_online_egtb_move(li: lichess.Lichess, board: chess.Board, game: model.Ga wb = "w" if board.turn == chess.WHITE else "b" pieces = chess.popcount(board.occupied) source = online_egtb_cfg.source - minimum_time = online_egtb_cfg.min_time * 1000 + minimum_time = seconds(online_egtb_cfg.min_time) if (not use_online_egtb - or game.state[f"{wb}time"] < minimum_time + or msec(game.state[f"{wb}time"]) < minimum_time or board.uci_variant not in ["chess", "antichess", "atomic"] and source == "lichess" or board.uci_variant != "chess" diff --git a/lichess-bot.py b/lichess-bot.py index 9247d4cfb..354d30bc0 100644 --- a/lichess-bot.py +++ b/lichess-bot.py @@ -13,6 +13,7 @@ import matchmaking import signal import time +import datetime import backoff import os import io @@ -23,7 +24,7 @@ import traceback from config import load_config, Configuration from conversation import Conversation, ChatLine -from timer import Timer +from timer import Timer, seconds, msec, hours, to_seconds from requests.exceptions import ChunkedEncodingError, ConnectionError, HTTPError, ReadTimeout from asyncio.exceptions import TimeoutError as MoveTimeout from rich.logging import RichHandler @@ -105,14 +106,14 @@ def watch_control_stream(control_queue: CONTROL_QUEUE_TYPE, li: lichess.Lichess) control_queue.put_nowait({"type": "terminated", "error": error}) -def do_correspondence_ping(control_queue: CONTROL_QUEUE_TYPE, period: int) -> None: +def do_correspondence_ping(control_queue: CONTROL_QUEUE_TYPE, period: datetime.timedelta) -> None: """ Tell the engine to check the correspondence games. :param period: How many seconds to wait before sending a correspondence ping. """ while not terminated: - time.sleep(period) + time.sleep(to_seconds(period)) control_queue.put_nowait({"type": "correspondence_ping"}) @@ -223,7 +224,7 @@ def start(li: lichess.Lichess, user_profile: USER_PROFILE_TYPE, config: Configur control_stream.start() correspondence_pinger = multiprocessing.Process(target=do_correspondence_ping, args=(control_queue, - config.correspondence.checkin_period)) + seconds(config.correspondence.checkin_period))) correspondence_pinger.start() correspondence_queue: CORRESPONDENCE_QUEUE_TYPE = manager.Queue() @@ -301,7 +302,7 @@ def lichess_bot_main(li: lichess.Lichess, if game["gameId"] not in startup_correspondence_games) low_time_games: list[EVENT_GETATTR_GAME_TYPE] = [] - last_check_online_time = Timer(60 * 60) # one hour interval + last_check_online_time = Timer(hours(1)) matchmaker = matchmaking.Matchmaking(li, config, user_profile) matchmaker.show_earliest_challenge_time() @@ -571,7 +572,7 @@ def play_game(li: lichess.Lichess, # Initial response of stream will be the full game info. Store it. initial_state = json.loads(next(lines).decode("utf-8")) logger.debug(f"Initial state: {initial_state}") - abort_time = config.abort_time + abort_time = seconds(config.abort_time) game = model.Game(initial_state, user_profile["username"], li.baseUrl, abort_time) with engine_wrapper.create_engine(config) as engine: @@ -583,14 +584,14 @@ def play_game(li: lichess.Lichess, is_correspondence = game.speed == "correspondence" correspondence_cfg = config.correspondence - correspondence_move_time = correspondence_cfg.move_time * 1000 - correspondence_disconnect_time = correspondence_cfg.disconnect_time + correspondence_move_time = seconds(correspondence_cfg.move_time) + correspondence_disconnect_time = seconds(correspondence_cfg.disconnect_time) engine_cfg = config.engine ponder_cfg = correspondence_cfg if is_correspondence else engine_cfg can_ponder = ponder_cfg.uci_ponder or ponder_cfg.ponder - move_overhead = config.move_overhead - delay_seconds = config.rate_limiting_delay / 1000 + move_overhead = msec(config.move_overhead) + delay = msec(config.rate_limiting_delay) keyword_map: defaultdict[str, str] = defaultdict(str, me=game.me.name, opponent=game.opponent.name) hello = get_greeting("hello", config.greeting, keyword_map) @@ -598,7 +599,7 @@ def play_game(li: lichess.Lichess, hello_spectators = get_greeting("hello_spectators", config.greeting, keyword_map) goodbye_spectators = get_greeting("goodbye_spectators", config.greeting, keyword_map) - disconnect_time = correspondence_disconnect_time if not game.state.get("moves") else 0 + disconnect_time = correspondence_disconnect_time if not game.state.get("moves") else seconds(0) prior_game = None board = chess.Board() upd: dict[str, Any] = game.state @@ -615,20 +616,20 @@ def play_game(li: lichess.Lichess, if not is_game_over(game) and is_engine_move(game, prior_game, board): disconnect_time = correspondence_disconnect_time say_hello(conversation, hello, hello_spectators, board) - start_time = time.perf_counter_ns() + setup_timer = Timer() print_move_number(board) move_attempted = True engine.play_move(board, game, li, - start_time, + setup_timer, move_overhead, can_ponder, is_correspondence, correspondence_move_time, engine_cfg, fake_think_time(config, board, game)) - time.sleep(delay_seconds) + time.sleep(to_seconds(delay)) elif is_game_over(game): tell_user_game_result(game, board) engine.send_game_result(game, board) @@ -636,7 +637,7 @@ def play_game(li: lichess.Lichess, conversation.send_message("spectator", goodbye_spectators) wb = "w" if board.turn == chess.WHITE else "b" - terminate_time = (upd[f"{wb}time"] + upd[f"{wb}inc"]) / 1000 + 60 + terminate_time = msec(upd[f"{wb}time"]) + msec(upd[f"{wb}inc"]) + seconds(60) game.ping(abort_time, terminate_time, disconnect_time) prior_game = copy.deepcopy(game) elif u_type == "ping" and should_exit_game(board, game, prior_game, li, is_correspondence): @@ -672,12 +673,12 @@ def say_hello(conversation: Conversation, hello: str, hello_spectators: str, boa conversation.send_message("spectator", hello_spectators) -def fake_think_time(config: Configuration, board: chess.Board, game: model.Game) -> float: +def fake_think_time(config: Configuration, board: chess.Board, game: model.Game) -> datetime.timedelta: """Calculate how much time we should wait for fake_think_time.""" - sleep = 0.0 + sleep = seconds(0.0) if config.fake_think_time and len(board.move_stack) > 9: - remaining = max(0, game.my_remaining_seconds() - config.move_overhead / 1000) + remaining = max(seconds(0), game.my_remaining_time() - msec(config.move_overhead)) delay = remaining * 0.025 accel = 0.99 ** (len(board.move_stack) - 10) sleep = delay * accel diff --git a/lichess.py b/lichess.py index ea8a94154..74dcde2ba 100644 --- a/lichess.py +++ b/lichess.py @@ -8,7 +8,8 @@ import logging import traceback from collections import defaultdict -from timer import Timer +import datetime +from timer import Timer, seconds, sec_str from typing import Optional, Union, Any import chess.engine JSON_REPLY_TYPE = dict[str, Any] @@ -131,7 +132,7 @@ def api_get(self, endpoint_name: str, *template_args: str, response = self.session.get(url, params=params, timeout=timeout, stream=stream) if is_new_rate_limit(response): - delay = 1 if endpoint_name == "move" else 60 + delay = seconds(1 if endpoint_name == "move" else 60) self.set_rate_limit_delay(path_template, delay) response.raise_for_status() @@ -213,7 +214,7 @@ def api_post(self, response = self.session.post(url, data=data, headers=headers, params=params, json=payload, timeout=2) if is_new_rate_limit(response): - self.set_rate_limit_delay(path_template, 60) + self.set_rate_limit_delay(path_template, seconds(60)) if raise_for_status: response.raise_for_status() @@ -231,10 +232,10 @@ def get_path_template(self, endpoint_name: str) -> str: path_template = ENDPOINTS[endpoint_name] if self.is_rate_limited(path_template): raise RateLimited(f"{path_template} is rate-limited. " - f"Will retry in {int(self.rate_limit_time_left(path_template))} seconds.") + f"Will retry in {sec_str(self.rate_limit_time_left(path_template))} seconds.") return path_template - def set_rate_limit_delay(self, path_template: str, delay_time: int) -> None: + def set_rate_limit_delay(self, path_template: str, delay_time: datetime.timedelta) -> None: """ Set a delay to a path template if it was rate limited. @@ -248,7 +249,7 @@ def is_rate_limited(self, path_template: str) -> bool: """Check if a path template is rate limited.""" return not self.rate_limit_timers[path_template].is_expired() - def rate_limit_time_left(self, path_template: str) -> float: + def rate_limit_time_left(self, path_template: str) -> datetime.timedelta: """How much time is left until we can use the path template normally.""" return self.rate_limit_timers[path_template].time_until_expiration() diff --git a/matchmaking.py b/matchmaking.py index e956bca5a..6f666dfa1 100644 --- a/matchmaking.py +++ b/matchmaking.py @@ -2,7 +2,7 @@ import random import logging import model -from timer import Timer +from timer import Timer, seconds, minutes, days from collections import defaultdict from collections.abc import Sequence import lichess @@ -18,7 +18,6 @@ daily_challenges_file_name = "daily_challenge_times.txt" timestamp_format = "%Y-%m-%d %H:%M:%S\n" -one_day_seconds = datetime.timedelta(days=1).total_seconds() def read_daily_challenges() -> DAILY_TIMERS_TYPE: @@ -27,7 +26,7 @@ def read_daily_challenges() -> DAILY_TIMERS_TYPE: try: with open(daily_challenges_file_name) as file: for line in file: - timers.append(Timer(one_day_seconds, datetime.datetime.strptime(line, timestamp_format))) + timers.append(Timer(days(1), datetime.datetime.strptime(line, timestamp_format))) except FileNotFoundError: pass @@ -38,7 +37,7 @@ def write_daily_challenges(daily_challenges: DAILY_TIMERS_TYPE) -> None: """Write the challenges we have created in the past 24 hours to a text file.""" with open(daily_challenges_file_name, "w") as file: for timer in daily_challenges: - file.write(timer.starting_timestamp().strftime(timestamp_format)) + file.write(timer.starting_timestamp(timestamp_format)) class Matchmaking: @@ -50,10 +49,10 @@ def __init__(self, li: lichess.Lichess, config: Configuration, user_profile: USE self.variants = list(filter(lambda variant: variant != "fromPosition", config.challenge.variants)) self.matchmaking_cfg = config.matchmaking self.user_profile = user_profile - self.last_challenge_created_delay = Timer(25) # The challenge expires 20 seconds after creating it. - self.last_game_ended_delay = Timer(self.matchmaking_cfg.challenge_timeout * 60) - self.last_user_profile_update_time = Timer(5 * 60) # 5 minutes. - self.min_wait_time = 60 # Wait 60 seconds before creating a new challenge to avoid hitting the api rate limits. + self.last_challenge_created_delay = Timer(seconds(25)) # Challenges expire after 20 seconds. + self.last_game_ended_delay = Timer(minutes(self.matchmaking_cfg.challenge_timeout)) + self.last_user_profile_update_time = Timer(minutes(5)) + self.min_wait_time = seconds(60) # Wait before new challenge to avoid api rate limits. self.challenge_id: str = "" self.daily_challenges: DAILY_TIMERS_TYPE = read_daily_challenges() @@ -124,8 +123,8 @@ def update_daily_challenge_record(self) -> None: etc. """ self.daily_challenges = [timer for timer in self.daily_challenges if not timer.is_expired()] - self.daily_challenges.append(Timer(one_day_seconds)) - self.min_wait_time = 60 * ((len(self.daily_challenges) // 50) + 1) + self.daily_challenges.append(Timer(days(1))) + self.min_wait_time = seconds(60) * ((len(self.daily_challenges) // 50) + 1) write_daily_challenges(self.daily_challenges) def perf(self) -> dict[str, dict[str, Any]]: @@ -244,7 +243,7 @@ def show_earliest_challenge_time(self) -> None: postgame_timeout = self.last_game_ended_delay.time_until_expiration() time_to_next_challenge = self.min_wait_time - self.last_challenge_created_delay.time_since_reset() time_left = max(postgame_timeout, time_to_next_challenge) - earliest_challenge_time = datetime.datetime.now() + datetime.timedelta(seconds=time_left) + earliest_challenge_time = datetime.datetime.now() + time_left challenges = "challenge" + ("" if len(self.daily_challenges) == 1 else "s") logger.info(f"Next challenge will be created after {earliest_challenge_time.strftime('%X')} " f"({len(self.daily_challenges)} {challenges} in last 24 hours)") diff --git a/model.py b/model.py index d425a567c..5bd92600d 100644 --- a/model.py +++ b/model.py @@ -4,7 +4,7 @@ import logging import datetime from enum import Enum -from timer import Timer +from timer import Timer, msec, seconds, sec_str, to_msec, to_seconds, years from config import Configuration from typing import Any from collections import defaultdict @@ -144,15 +144,15 @@ class Termination(str, Enum): class Game: """Store information about a game.""" - def __init__(self, game_info: dict[str, Any], username: str, base_url: str, abort_time: int) -> None: + def __init__(self, game_info: dict[str, Any], username: str, base_url: str, abort_time: datetime.timedelta) -> None: """:param abort_time: How long to wait before aborting the game.""" self.username = username self.id: str = game_info["id"] self.speed = game_info.get("speed") clock = game_info.get("clock") or {} - ten_years_in_ms = 1000 * 3600 * 24 * 365 * 10 - self.clock_initial = clock.get("initial", ten_years_in_ms) - self.clock_increment = clock.get("increment", 0) + ten_years_in_ms = to_msec(years(10)) + self.clock_initial = msec(clock.get("initial", ten_years_in_ms)) + self.clock_increment = msec(clock.get("increment", 0)) self.perf_name = (game_info.get("perf") or {}).get("name", "{perf?}") self.variant_name = game_info["variant"]["name"] self.mode = "rated" if game_info.get("rated") else "casual" @@ -166,10 +166,11 @@ def __init__(self, game_info: dict[str, Any], username: str, base_url: str, abor self.me = self.white if self.is_white else self.black self.opponent = self.black if self.is_white else self.white self.base_url = base_url - self.game_start = datetime.datetime.fromtimestamp(game_info["createdAt"] / 1000, tz=datetime.timezone.utc) + self.game_start = datetime.datetime.fromtimestamp(to_seconds(msec(game_info["createdAt"])), + tz=datetime.timezone.utc) self.abort_time = Timer(abort_time) - self.terminate_time = Timer((self.clock_initial + self.clock_increment) / 1000 + abort_time + 60) - self.disconnect_time = Timer(0) + self.terminate_time = Timer(self.clock_initial + self.clock_increment + abort_time + seconds(60)) + self.disconnect_time = Timer(seconds(0)) def url(self) -> str: """Get the url of the game.""" @@ -188,7 +189,7 @@ def pgn_event(self) -> str: def time_control(self) -> str: """Get the time control of the game.""" - return f"{int(self.clock_initial/1000)}+{int(self.clock_increment/1000)}" + return f"{sec_str(self.clock_initial)}+{sec_str(self.clock_increment)}" def is_abortable(self) -> bool: """Whether the game can be aborted.""" @@ -196,7 +197,7 @@ def is_abortable(self) -> bool: # than two moves (one from each player) have been played. return " " not in self.state["moves"] - def ping(self, abort_in: int, terminate_in: int, disconnect_in: int) -> None: + def ping(self, abort_in: datetime.timedelta, terminate_in: datetime.timedelta, disconnect_in: datetime.timedelta) -> None: """ Tell the bot when to abort, terminate, and disconnect from a game. @@ -221,11 +222,11 @@ def should_disconnect_now(self) -> bool: """Whether we should disconnect form the game.""" return self.disconnect_time.is_expired() - def my_remaining_seconds(self) -> float: + def my_remaining_time(self) -> datetime.timedelta: """How many seconds we have left.""" - wtime: int = self.state["wtime"] - btime: int = self.state["btime"] - return (wtime if self.is_white else btime) / 1000 + wtime = msec(self.state["wtime"]) + btime = msec(self.state["btime"]) + return wtime if self.is_white else btime def result(self) -> str: """Get the result of the game.""" diff --git a/test_bot/lichess.py b/test_bot/lichess.py index 22c1fc915..15e486ee7 100644 --- a/test_bot/lichess.py +++ b/test_bot/lichess.py @@ -5,6 +5,7 @@ import json import logging import traceback +from timer import seconds, to_msec from typing import Union, Any, Optional, Generator logger = logging.getLogger(__name__) @@ -68,7 +69,7 @@ def iter_lines(self) -> Generator[bytes, None, None]: board = chess.Board() for move in moves.split(): board.push_uci(move) - wtime, btime = state[1].split(",") + wtime, btime = [seconds(float(n)) for n in state[1].split(",")] if len(moves) <= len(self.moves_sent) and not event: time.sleep(0.001) continue @@ -76,12 +77,11 @@ def iter_lines(self) -> Generator[bytes, None, None]: break except (IndexError, ValueError): pass - wtime_int, wtime_int = float(wtime), float(btime) time.sleep(0.1) new_game_state = {"type": "gameState", "moves": moves, - "wtime": int(wtime_int * 1000), - "btime": int(wtime_int * 1000), + "wtime": int(to_msec(wtime)), + "btime": int(to_msec(btime)), "winc": 100, "binc": 100} if event == "end": diff --git a/test_bot/test_bot.py b/test_bot/test_bot.py index 5c633aa3f..a8189bf80 100644 --- a/test_bot/test_bot.py +++ b/test_bot/test_bot.py @@ -13,6 +13,7 @@ import shutil import importlib import config +from timer import Timer, to_seconds, seconds from typing import Any if __name__ == "__main__": sys.exit(f"The script {os.path.basename(__file__)} should only be run by pytest.") @@ -85,15 +86,15 @@ def thread_for_test() -> None: open("./logs/states.txt", "w").close() open("./logs/result.txt", "w").close() - start_time = 10. - increment = 0.1 + start_time = seconds(10) + increment = seconds(0.1) board = chess.Board() wtime = start_time btime = start_time with open("./logs/states.txt", "w") as file: - file.write(f"\n{wtime},{btime}") + file.write(f"\n{to_seconds(wtime)},{to_seconds(btime)}") engine = chess.engine.SimpleEngine.popen_uci(stockfish_path) engine.configure({"Skill Level": 0, "Move Overhead": 1000, "Use NNUE": False}) @@ -105,13 +106,12 @@ def thread_for_test() -> None: chess.engine.Limit(time=1), ponder=False) else: - start_time = time.perf_counter_ns() + move_timer = Timer() move = engine.play(board, - chess.engine.Limit(white_clock=wtime - 2, - white_inc=increment), + chess.engine.Limit(white_clock=to_seconds(wtime) - 2, + white_inc=to_seconds(increment)), ponder=False) - end_time = time.perf_counter_ns() - wtime -= (end_time - start_time) / 1e9 + wtime -= move_timer.time_since_reset() wtime += increment engine_move = move.move if engine_move is None: @@ -128,7 +128,7 @@ def thread_for_test() -> None: file.write(state_str) else: # lichess-bot move. - start_time = time.perf_counter_ns() + move_timer = Timer() state2 = state_str moves_are_correct = False while state2 == state_str or not moves_are_correct: @@ -145,9 +145,8 @@ def thread_for_test() -> None: moves_are_correct = False with open("./logs/states.txt") as states: state2 = states.read() - end_time = time.perf_counter_ns() if len(board.move_stack) > 1: - btime -= (end_time - start_time) / 1e9 + btime -= move_timer.time_since_reset() btime += increment move_str = state2.split("\n")[0].split(" ")[-1] board.push_uci(move_str) @@ -156,7 +155,7 @@ def thread_for_test() -> None: with open("./logs/states.txt") as states: state_str = states.read() state = state_str.split("\n") - state[1] = f"{wtime},{btime}" + state[1] = f"{to_seconds(wtime)},{to_seconds(btime)}" state_str = "\n".join(state) with open("./logs/states.txt", "w") as file: file.write(state_str) diff --git a/timer.py b/timer.py index c9f6bcf72..07c658912 100644 --- a/timer.py +++ b/timer.py @@ -4,21 +4,83 @@ from typing import Optional +def msec(time_in_msec: float) -> datetime.timedelta: + """Create a timedelta duration in milliseconds.""" + return datetime.timedelta(milliseconds=time_in_msec) + + +def to_msec(duration: datetime.timedelta) -> float: + """Return a bare number representing the length of the duration in milliseconds.""" + return duration / msec(1) + + +def msec_str(duration: datetime.timedelta) -> str: + """Return a string with the duration value in whole number milliseconds.""" + return f"{round(to_msec(duration))}" + + +def seconds(time_in_sec: float) -> datetime.timedelta: + """Create a timedelta duration in seconds.""" + return datetime.timedelta(seconds=time_in_sec) + + +def to_seconds(duration: datetime.timedelta) -> float: + """Return a bare number representing the length of the duration in seconds.""" + return duration.total_seconds() + + +def sec_str(duration: datetime.timedelta) -> str: + """Return a string with the duration value in whole number seconds.""" + return f"{round(to_seconds(duration))}" + + +def minutes(time_in_minutes: float) -> datetime.timedelta: + """Create a timedelta duration in minutes.""" + return datetime.timedelta(minutes=time_in_minutes) + + +def hours(time_in_hours: float) -> datetime.timedelta: + """Create a timedelta duration in hours.""" + return datetime.timedelta(minutes=time_in_hours) + + +def days(time_in_days: float) -> datetime.timedelta: + """Create a timedelta duration in minutes.""" + return datetime.timedelta(days=time_in_days) + + +def years(time_in_years: float) -> datetime.timedelta: + """Create a timedelta duration in median years--i.e., 365 days.""" + return days(365 * time_in_years) + + class Timer: - """A timer for use in lichess-bot.""" + """ + A timer for use in lichess-bot. An instance of timer can be used both as a countdown timer and a stopwatch. + + If the duration argument in the __init__() method is greater than zero, then + the method is_expired() indicates when the intial duration has passed. The + method time_until_expiration() gives the amount of time left until the timer + expires. + + Regardless of the initial duration (even if it's zero), a timer can be used + as a stopwatch by calling time_since_reset() to get the amount of time since + the timer was created or since it was last reset. + """ - def __init__(self, duration: float = 0, backdated_start: Optional[datetime.datetime] = None) -> None: + def __init__(self, duration: datetime.timedelta = seconds(0), + backdated_timestamp: Optional[datetime.datetime] = None) -> None: """ Start the timer. - :param duration: The duration of the timer. - :param backdated_start: When the timer started. Used to keep the timers between sessions. + :param duration: The duration of time before Timer.is_expired() returns True. + :param backdated_timestamp: When the timer should have started. Used to keep the timers between sessions. """ self.duration = duration self.reset() - if backdated_start: - time_already_used = datetime.datetime.now() - backdated_start - self.starting_time -= time_already_used.total_seconds() + if backdated_timestamp is not None: + time_already_used = datetime.datetime.now() - backdated_timestamp + self.starting_time -= to_seconds(time_already_used) def is_expired(self) -> bool: """Check if a timer is expired.""" @@ -26,16 +88,16 @@ def is_expired(self) -> bool: def reset(self) -> None: """Reset the timer.""" - self.starting_time = time.time() + self.starting_time = time.perf_counter() - def time_since_reset(self) -> float: + def time_since_reset(self) -> datetime.timedelta: """How much time has passed.""" - return time.time() - self.starting_time + return seconds(time.perf_counter() - self.starting_time) - def time_until_expiration(self) -> float: + def time_until_expiration(self) -> datetime.timedelta: """How much time is left until it expires.""" - return max(0., self.duration - self.time_since_reset()) + return max(seconds(0), self.duration - self.time_since_reset()) - def starting_timestamp(self) -> datetime.datetime: + def starting_timestamp(self, format: str) -> str: """When the timer started.""" - return datetime.datetime.now() - datetime.timedelta(seconds=self.time_since_reset()) + return (datetime.datetime.now() - self.time_since_reset()).strftime(format)