Skip to content

Commit

Permalink
fix: tendbCluster原地重建增加路由修改节点定点回档支持从原主从节点获取binlog #6666
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Sep 6, 2024
1 parent d6a4c02 commit d63c5df
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 207 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging.config
from datetime import datetime

from django.utils.translation import ugettext as _

from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler

logger = logging.getLogger("root")


def get_backup_binlog(
cluster_id: int, start_time: datetime, end_time: datetime, binlog_info: dict, minute_range=30
) -> dict:
result = {}
if start_time > end_time:
result["query_binlog_error"] = _("备份时间点:{} 大于 回滚时间点:{}".format(start_time, end_time))
return result
# 先从别分文件的主节点查询,查询不到改为从节点查询。
rollback_handler = FixPointRollbackHandler(cluster_id)
backup_binlog = rollback_handler.query_binlog_from_bklog(
start_time=start_time,
end_time=end_time,
host_ip=binlog_info["show_master_status"]["master_host"],
port=binlog_info["show_master_status"]["master_port"],
minute_range=minute_range,
)
result["binlog_start_file"] = binlog_info["show_master_status"]["binlog_file"]
result["binlog_start_pos"] = binlog_info["show_master_status"]["binlog_pos"]
if backup_binlog is None:
if "show_slave_status" not in binlog_info.keys():
result["query_binlog_error"] = _(
"获取原主节点 {} binlog失败".format(binlog_info["show_master_status"]["master_host"])
)
return result
backup_binlog = rollback_handler.query_binlog_from_bklog(
start_time=start_time,
end_time=end_time,
host_ip=binlog_info["show_slave_status"]["master_host"],
port=binlog_info["show_slave_status"]["master_port"],
minute_range=minute_range,
)
result["binlog_start_file"] = binlog_info["show_slave_status"]["binlog_file"]
result["binlog_start_pos"] = binlog_info["show_slave_status"]["binlog_pos"]
if backup_binlog is None:
result["query_binlog_error"] = _("获取原主节点{} 和 原从节点{} 的binlog失败").format(
binlog_info["show_master_status"]["master_host"], binlog_info["show_slave_status"]["master_host"]
)
return result
result["binlog_task_ids"] = [i["task_id"] for i in backup_binlog["file_list_details"]]
binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]]
result["binlog_files"] = ",".join(binlog_files)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME, MYSQL_USUAL_JOB_TIME, DBType
from backend.db_meta.models import Cluster
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.consts import MysqlChangeMasterType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import TenDBGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.mysql.common.get_binlog_backup import get_backup_binlog
from backend.flow.engine.bamboo.scene.mysql.common.get_local_backup import get_local_backup
from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.spider.common.exceptions import (
TendbGetBackupInfoFailedException,
TendbGetBinlogFailedException,
)
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_download_backupfile import (
MySQLDownloadBackupfileComponent,
Expand Down Expand Up @@ -219,26 +221,19 @@ def mysql_rollback_data_sub_flow(

if is_rollback_binlog:
backup_time = str2datetime(backup_info["backup_time"], "%Y-%m-%d %H:%M:%S")
rollback_handler = FixPointRollbackHandler(cluster_model.id)
# 从父节点来
backup_binlog = rollback_handler.query_binlog_from_bklog(
binlog_result = get_backup_binlog(
cluster_id=cluster_model.id,
start_time=backup_time,
end_time=rollback_time,
minute_range=30,
host_ip=cluster["master_ip"],
port=cluster["master_port"],
binlog_info=backup_info["binlog_info"],
)
if "query_binlog_error" in binlog_result.keys():
raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"])
cluster.update(binlog_result)

if backup_binlog is None:
raise TenDBGetBackupInfoFailedException(message=_("获取实例 {} 的备份信息失败".format(cluster["master_ip"])))
# task_ids = [i["task_id"] for i in backup_info["file_list"]]
task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]]
binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]]
cluster["backup_time"] = backup_info["backup_time"]
cluster["binlog_files"] = ",".join(binlog_files)
download_kwargs = DownloadBackupFileKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
task_ids=task_ids,
task_ids=binlog_result["binlog_task_ids"],
dest_ip=cluster["rollback_ip"],
dest_dir=cluster["file_target_path"],
reason="rollback node rollback binlog",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,6 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
reason="slave recover",
)

sub_pipeline.add_act(
act_name=_("下发db-actor到节点{}".format(cluster["master_ip"])),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=cluster["bk_cloud_id"],
exec_ip=[cluster["master_ip"], cluster["new_slave_ip"]],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
)

sub_pipeline.add_act(
act_name=_("下载全库备份介质到 {}".format(cluster["new_slave_ip"])),
act_component_code=MySQLDownloadBackupfileComponent.code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import check_sub_flow
from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.mysql_rds_execute import MySQLExecuteRdsComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.mysql.mysql_act_dataclass import (
CreateDnsKwargs,
DownloadMediaKwargs,
ExecuteRdsKwargs,
InstanceUserCloneKwargs,
RecycleDnsRecordKwargs,
)
Expand Down Expand Up @@ -62,19 +63,36 @@ def slave_migrate_switch_sub_flow(
)

# 切换前做预检测
verify_checksum_tuples = [{"master": old_master, "slave": new_slave}]
# for m in migrate_tuples:
# old_master-> new_master ; new_master -> new_slave 都需要检测checksum结果
sub_pipeline.add_sub_pipeline(
sub_flow=check_sub_flow(
uid=ticket_data["uid"],
root_id=root_id,
cluster=cluster,
is_check_client_conn=True,
is_verify_checksum=True,
check_client_conn_inst=["{}:{}".format(new_slave_ip, master.port)],
verify_checksum_tuples=verify_checksum_tuples,
)
# verify_checksum_tuples = [{"master": old_master, "slave": new_slave}]
# # for m in migrate_tuples:
# # old_master-> new_master ; new_master -> new_slave 都需要检测checksum结果
# sub_pipeline.add_sub_pipeline(
# sub_flow=check_sub_flow(
# uid=ticket_data["uid"],
# root_id=root_id,
# cluster=cluster,
# is_check_client_conn=True,
# is_verify_checksum=True,
# check_client_conn_inst=["{}:{}".format(new_slave_ip, master.port)],
# verify_checksum_tuples=verify_checksum_tuples,
# )
# )
# 不做检查,而是在新从库通过rds加入一条恒为正确的记录。
fake_checksum_sql = """replace into infodba_schema.checksum values
('{}',{},'_fake_db_','_fake_tbl_',0,0,'PRIMARY',0,0,0,0,0,0,now())""".format(
master.machine.ip, master.port
)
sub_pipeline.add_act(
act_name=_("新从库加入checksum记录 {}").format(new_slave),
act_component_code=MySQLExecuteRdsComponent.code,
kwargs=asdict(
ExecuteRdsKwargs(
bk_cloud_id=cluster.bk_cloud_id,
instance_ip=new_slave_ip,
instance_port=master.port,
sqls=[fake_checksum_sql],
)
),
)

clone_data = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datetime import datetime
from typing import Dict, Optional

from django.db.models import Q
from django.utils.crypto import get_random_string
from django.utils.translation import ugettext as _

Expand Down Expand Up @@ -74,7 +75,11 @@ def rollback_data_flow(self):
for info in self.ticket_data["infos"]:
self.data = copy.deepcopy(info)
cluster_class = Cluster.objects.get(id=self.data["cluster_id"])
master = cluster_class.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
filters = Q(cluster_type=ClusterType.TenDBSingle.value, instance_inner_role=InstanceInnerRole.ORPHAN.value)
filters = filters | Q(
cluster_type=ClusterType.TenDBHA.value, instance_inner_role=InstanceInnerRole.MASTER.value
)
master = cluster_class.storageinstance_set.get(filters)
self.data["bk_biz_id"] = cluster_class.bk_biz_id
self.data["bk_cloud_id"] = cluster_class.bk_cloud_id
self.data["db_module_id"] = cluster_class.db_module_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,127 +14,26 @@

from django.utils.translation import ugettext as _

from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME, DBType
from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import TenDBGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.mysql.common.get_binlog_backup import get_backup_binlog
from backend.flow.engine.bamboo.scene.spider.common.exceptions import (
TendbGetBackupInfoFailedException,
TendbGetBinlogFailedException,
)
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_download_backupfile import (
MySQLDownloadBackupfileComponent,
)
from backend.flow.plugins.components.collections.mysql.mysql_rollback_data_download_binlog import (
MySQLRollbackDownloadBinlogComponent,
)
from backend.flow.plugins.components.collections.mysql.rollback_trans_flies import RollBackTransFileComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.mysql.mysql_act_dataclass import (
DownloadBackupFileKwargs,
DownloadMediaKwargs,
ExecActuatorKwargs,
P2PFileKwargs,
RollBackTransFileKwargs,
)
from backend.flow.utils.mysql.mysql_act_dataclass import DownloadBackupFileKwargs, ExecActuatorKwargs, P2PFileKwargs
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.utils.time import str2datetime

logger = logging.getLogger("flow")


def rollback_local_and_time(root_id: str, ticket_data: dict, cluster_info: dict):
"""
mysql 定点回档类型 本地备份+指定时间
@param root_id: flow 流程root_id
@param ticket_data: 关联单据 ticket对象
@param cluster_info: 关联的cluster对象
"""
cluster_info["recover_binlog"] = True
sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(ticket_data))
sub_pipeline.add_act(
act_name=_("下发db_actuator介质"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=cluster_info["bk_cloud_id"],
exec_ip=[cluster_info["master_ip"], cluster_info["old_slave_ip"]],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
)
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster_info["bk_cloud_id"],
cluster_type=cluster_info["cluster_type"],
cluster=cluster_info,
job_timeout=MYSQL_DATA_RESTORE_TIME,
)
exec_act_kwargs.exec_ip = cluster_info["master_ip"]
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_find_local_backup_payload.__name__
sub_pipeline.add_act(
act_name=_("定点恢复之获取MASTER节点备份介质{}").format(exec_act_kwargs.exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
write_payload_var="master_backup_file",
)

exec_act_kwargs.exec_ip = cluster_info["old_slave_ip"]
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_find_local_backup_payload.__name__
sub_pipeline.add_act(
act_name=_("定点恢复之获取SLAVE节点备份介质{}").format(exec_act_kwargs.exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
write_payload_var="slave_backup_file",
)

sub_pipeline.add_act(
act_name=_("判断备份文件来源,并传输备份文件到新定点恢复节点{}").format(cluster_info["rollback_ip"]),
act_component_code=RollBackTransFileComponent.code,
kwargs=asdict(
RollBackTransFileKwargs(
bk_cloud_id=cluster_info["bk_cloud_id"],
file_list=[],
file_target_path=cluster_info["file_target_path"],
source_ip_list=[],
exec_ip=cluster_info["rollback_ip"],
cluster=cluster_info,
)
),
)

exec_act_kwargs.exec_ip = cluster_info["rollback_ip"]
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_rollback_data_restore_payload.__name__
sub_pipeline.add_act(
act_name=_("定点恢复之恢复数据{}").format(exec_act_kwargs.exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
write_payload_var="change_master_info",
)

# backup_time 在活动节点里。到flow下载binlog
download_kwargs = DownloadBackupFileKwargs(
bk_cloud_id=cluster_info["bk_cloud_id"],
task_ids=[],
dest_ip=cluster_info["rollback_ip"],
dest_dir=cluster_info["file_target_path"],
reason="spider node rollback binlog",
cluster=cluster_info,
)
sub_pipeline.add_act(
act_name=_("下载定点恢复的binlog到{}").format(cluster_info["rollback_ip"]),
act_component_code=MySQLRollbackDownloadBinlogComponent.code,
kwargs=asdict(download_kwargs),
)

exec_act_kwargs.exec_ip = cluster_info["rollback_ip"]
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_recover_binlog_payload.__name__
sub_pipeline.add_act(
act_name=_("定点恢复之前滚binlog{}".format(exec_act_kwargs.exec_ip)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
return sub_pipeline.build_sub_process(sub_name=_("{}定点回滚数据 LOCAL_AND_TIME ".format(cluster_info["rollback_ip"])))


def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict):
"""
mysql 定点回档类型 远程备份文件+指定时间
Expand Down Expand Up @@ -188,23 +87,19 @@ def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict
)
backup_time = str2datetime(backupinfo["backup_time"])
rollback_time = str2datetime(cluster_info["rollback_time"])
rollback_handler = FixPointRollbackHandler(cluster_info["cluster_id"])
backup_binlog = rollback_handler.query_binlog_from_bklog(
binlog_result = get_backup_binlog(
cluster_id=cluster_info["cluster_id"],
start_time=backup_time,
end_time=rollback_time,
minute_range=30,
host_ip=cluster_info["master_ip"],
port=cluster_info["master_port"],
binlog_info=backupinfo["binlog_info"],
)
if backup_binlog is None:
raise TenDBGetBackupInfoFailedException(message=_("获取实例 {} 的备份信息失败".format(cluster_info["master_ip"])))
if "query_binlog_error" in binlog_result.keys():
raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"])
cluster_info.update(binlog_result)

task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]]
binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]]
cluster_info["binlog_files"] = ",".join(binlog_files)
download_kwargs = DownloadBackupFileKwargs(
bk_cloud_id=cluster_info["bk_cloud_id"],
task_ids=task_ids,
task_ids=binlog_result["binlog_task_ids"],
dest_ip=cluster_info["rollback_ip"],
dest_dir=cluster_info["file_target_path"],
reason="spider node rollback binlog",
Expand Down
Loading

0 comments on commit d63c5df

Please sign in to comment.