Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mysql): tendbCluster支持指定分片查询备份用于恢复 #6747 #6790

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import copy
import json
import logging
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -36,6 +37,8 @@
from backend.utils.string import base64_encode
from backend.utils.time import compare_time, datetime2str, find_nearby_time

logger = logging.getLogger("flow")


class FixPointRollbackHandler:
"""
Expand Down Expand Up @@ -192,10 +195,11 @@ def aggregate_tendb_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> Li

return valid_backup_logs

def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict], shard_list: List = None) -> List[Dict]:
"""
聚合tendbcluster的mysql_backup_result日志,按照backup_id聚合tendb备份记录
:param backup_logs: 备份记录列表
:param shard_list: 备份分片数
"""

def insert_time_field(_back_log, _log):
Expand Down Expand Up @@ -298,17 +302,25 @@ def insert_log_into_node(_backup_node, _log):
if backup_node:
insert_time_field(backup_id__backup_logs_map[backup_id], backup_node)

logger.info("backup info:", backup_id__backup_logs_map)
# 获取合法的备份记录
cluster_shard_num = self.cluster.tendbclusterstorageset_set.count()
if shard_list is not None and len(shard_list) > 0:
shard_list = sorted(shard_list)
else:
cluster_shard_num = self.cluster.tendbclusterstorageset_set.count()
shard_list = list(range(0, cluster_shard_num))

backup_id__valid_backup_logs = defaultdict(dict)
for backup_id, backup_log in backup_id__backup_logs_map.items():
# 获取合法分片ID,如果分片数不完整,则忽略
shard_value_list = [
shard_value
int(shard_value)
for shard_value in backup_log["remote_node"].keys()
if backup_log["remote_node"][shard_value]
]
if sorted(shard_value_list) != list(range(0, cluster_shard_num)):
logger.info("backup shard list:", shard_value_list)
logger.info("user shard list:", shard_list)
if not set(shard_value_list).issuperset(set(shard_list)):
continue

# 如果不存在spider master记录,则忽略
Expand All @@ -323,11 +335,12 @@ def insert_log_into_node(_backup_node, _log):

return list(backup_id__valid_backup_logs.values())

def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime) -> List[Dict]:
def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime, **kwargs) -> List[Dict]:
"""
通过日志平台查询集群的时间范围内的备份记录
:param start_time: 开始时间
:param end_time: 结束时间
:param shard_list: tendbcluster专属,备份分片数
"""

backup_logs = self._get_log_from_bklog(
Expand All @@ -338,7 +351,8 @@ def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime)
)

if self.cluster.cluster_type == ClusterType.TenDBCluster:
return self.aggregate_tendbcluster_dbbackup_logs(backup_logs)
shard_list = kwargs.get("shard_list", [])
return self.aggregate_tendbcluster_dbbackup_logs(backup_logs, shard_list)
else:
return self.aggregate_tendb_dbbackup_logs(backup_logs)

Expand Down Expand Up @@ -483,7 +497,7 @@ def query_backup_log_from_local(self) -> List[Dict[str, Any]]:
return local_backup_logs

def query_latest_backup_log(
self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value
self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value, **kwargs
) -> Dict[str, Any]:
"""
根据回档时间查询最新一次的备份记录
Expand All @@ -495,7 +509,7 @@ def query_latest_backup_log(
# 日志平台查询
end_time = rollback_time
start_time = end_time - timedelta(days=BACKUP_LOG_ROLLBACK_TIME_RANGE_DAYS)
backup_logs = self.query_backup_log_from_bklog(start_time, end_time)
backup_logs = self.query_backup_log_from_bklog(start_time, end_time, **kwargs)

if not backup_logs:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def mysql_restore_data_sub_flow(
backup_info = get_local_backup(ins_list, cluster_model)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster_model.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_model.id)))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的本地备份信息失败".format(cluster_model.id)))
cluster["backupinfo"] = backup_info
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
# 查询备份
rollback_time = datetime.now(timezone.utc)
rollback_handler = FixPointRollbackHandler(cluster_id=cluster["cluster_id"])
backup_info = rollback_handler.query_latest_backup_log(rollback_time)
shard_list = []
if cluster["cluster_type"] == ClusterType.TenDBCluster:
shard_list = [int(cluster_info["shard_id"])]
backup_info = rollback_handler.query_latest_backup_log(rollback_time, shard_list=shard_list)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster["cluster_id"]))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster["cluster_id"])))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,27 @@ def migrate_master_slave_flow(self):
db_module_id=cluster_class.db_module_id,
cluster_type=cluster_class.cluster_type,
)

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
cluster_info = get_master_slave_recover_info(
cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"]
)
cluster_info["charset"] = self.data["charset"]
cluster_info["db_version"] = self.data["db_version"]
cluster_info["ports"] = []
backup_info = {}
if self.ticket_data["backup_source"] == MySQLBackupSource.REMOTE.value:
# 先查询备份,如果备份不存在则退出
# restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S")
backup_handler = FixPointRollbackHandler(cluster_class.id)
restore_time = datetime.now(timezone.utc)
backup_info = backup_handler.query_latest_backup_log(restore_time)
shard_list = [int(shard_id) for shard_id in cluster_info["my_shards"].keys()]
backup_info = backup_handler.query_latest_backup_log(restore_time, shard_list=shard_list)
logger.debug(backup_info)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
cluster_info = get_master_slave_recover_info(
cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"]
)

cluster_info["charset"] = self.data["charset"]
cluster_info["db_version"] = self.data["db_version"]
cluster_info["ports"] = []
for shard_id, shard in cluster_info["my_shards"].items():
master = {
"ip": self.data["new_master_ip"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ def _alter_remote_slave_routing(

# 获取待切换的分片信息,拼接alter node语句
server_name = res[0]["cmd_results"][1]["table_data"][0]["Server_name"]

# 执行替换节点路由信息
exec_sql = [
"set tc_admin=1",
f"TDBCTL ALTER NODE "
f"{server_name} OPTIONS(host '{new_ip}', port {new_port}, password '{tdbctl_pass}', user '{TDBCTL_USER}')",
"TDBCTL FLUSH ROUTING",
]
self.log_info(str(exec_sql))
rpc_params["cmds"] = exec_sql
res = DRSApi.rpc(rpc_params)
if res[0]["error_msg"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TendbClusterRestoreSlaveParamBuilder(builders.FlowParamBuilder):

def format_ticket_data(self):
if self.ticket_data["ip_source"] == IpSource.RESOURCE_POOL:
for info in self.ticket_data["infos"]:
info["resource_spec"]["remote"] = info["resource_spec"]["new_slave"]
return

for info in self.ticket_data["infos"]:
Expand Down
Loading