diff --git a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py index b96b6f1f29..1f73356b70 100644 --- a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py +++ b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py @@ -565,9 +565,15 @@ def query_binlog_from_bklog( "file_list_details": [], } collector_fields = ["file_mtime", "start_time", "stop_time", "size", "task_id", "filename"] + # 记录file task id,用于去重 + file_task_id_list = [] for log in binlogs: + if log["task_id"] in file_task_id_list: + continue + detail = {field: log[field] for field in collector_fields} detail["file_name"] = detail.pop("filename") + file_task_id_list.append(detail["task_id"]) binlog_record["file_list_details"].append(detail) return binlog_record diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py index 37fc5c1a94..1561eb1ef3 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py @@ -72,42 +72,45 @@ def spider_recover_sub_flow(root_id: str, ticket_data: dict, cluster: dict): kwargs=asdict(exec_act_kwargs), ) if cluster["rollback_type"] == RollbackType.REMOTE_AND_TIME.value: - if compare_time(backup_info["backup_time"], cluster["rollback_time"]): - raise TendbGetBinlogFailedException(message=_("{} 备份时间点大于回滚时间点".format(cluster["master_ip"]))) - rollback_handler = FixPointRollbackHandler(cluster["cluster_id"]) - backup_binlog = rollback_handler.query_binlog_from_bklog( - str2datetime(backup_info["backup_time"]), - str2datetime(cluster["rollback_time"]), - minute_range=30, - host_ip=cluster["rollback_ip"], - port=cluster["rollback_port"], - ) - if backup_binlog is None: - raise TendbGetBinlogFailedException(message=_("获取实例 {} binlog失败".format(cluster["rollback_ip"]))) + spider_has_binlog = cluster.get("spider_has_binlog", False) + if spider_has_binlog: + if compare_time(backup_info["backup_time"], cluster["rollback_time"]): + raise TendbGetBinlogFailedException(message=_("{} 备份时间点大于回滚时间点".format(cluster["master_ip"]))) + rollback_handler = FixPointRollbackHandler(cluster["cluster_id"]) + backup_binlog = rollback_handler.query_binlog_from_bklog( + str2datetime(backup_info["backup_time"]), + str2datetime(cluster["rollback_time"]), + minute_range=30, + host_ip=cluster["rollback_ip"], + port=cluster["rollback_port"], + ) + if backup_binlog is None: + raise TendbGetBinlogFailedException(message=_("获取实例 {} binlog失败".format(cluster["rollback_ip"]))) - task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] - binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] - cluster["binlog_files"] = ",".join(binlog_files) - download_kwargs = DownloadBackupFileKwargs( - bk_cloud_id=cluster["bk_cloud_id"], - task_ids=task_ids, - dest_ip=cluster["rollback_ip"], - desc_dir=cluster["file_target_path"], - reason="spider node rollback binlog", - ) - sub_pipeline.add_act( - act_name=_("下载定点恢复的binlog到{}:{}").format(cluster["rollback_ip"], cluster["rollback_port"]), - act_component_code=MySQLDownloadBackupfileComponent.code, - kwargs=asdict(download_kwargs), - ) + task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] + binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] + cluster["binlog_files"] = ",".join(binlog_files) + download_kwargs = DownloadBackupFileKwargs( + bk_cloud_id=cluster["bk_cloud_id"], + task_ids=task_ids, + dest_ip=cluster["rollback_ip"], + desc_dir=cluster["file_target_path"], + reason="spider node rollback binlog", + ) + sub_pipeline.add_act( + act_name=_("下载定点恢复的binlog到{}:{}").format(cluster["rollback_ip"], cluster["rollback_port"]), + act_component_code=MySQLDownloadBackupfileComponent.code, + kwargs=asdict(download_kwargs), + ) + + exec_act_kwargs.exec_ip = cluster["rollback_ip"] + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_recover_binlog_payload.__name__ + sub_pipeline.add_act( + act_name=_("定点恢复之前滚binlog{}:{}").format(exec_act_kwargs.exec_ip, cluster["rollback_port"]), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) - exec_act_kwargs.exec_ip = cluster["rollback_ip"] - exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_recover_binlog_payload.__name__ - sub_pipeline.add_act( - act_name=_("定点恢复之前滚binlog{}:{}").format(exec_act_kwargs.exec_ip, cluster["rollback_port"]), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(exec_act_kwargs), - ) return sub_pipeline.build_sub_process(sub_name=_("spider恢复:{}".format(cluster["instance"]))) diff --git a/dbm-ui/backend/utils/time.py b/dbm-ui/backend/utils/time.py index 885ea03439..c9c1b4aa0a 100644 --- a/dbm-ui/backend/utils/time.py +++ b/dbm-ui/backend/utils/time.py @@ -52,7 +52,7 @@ def datetime2str(o_datetime: datetime.datetime, fmt: str = DATETIME_PATTERN, awa return o_datetime if aware_check and not timezone.is_aware(o_datetime): - raise ValidationError("Time zone check failed...") + raise ValidationError(f"[{timezone}] Time zone check failed...") o_datetime = timezone.localtime(o_datetime) # 可读性优化,暂时去掉毫秒单位。TODO:时间是否需要完全精准? @@ -71,7 +71,7 @@ def str2datetime(datetime_str: str, fmt: str = DATETIME_PATTERN, aware_check: bo o_datetime = time_parse(datetime_str) if aware_check and not timezone.is_aware(o_datetime): - raise ValidationError("Time zone check failed...") + raise ValidationError(f"[{timezone}] Time zone check failed...") return o_datetime