Skip to content

Commit

Permalink
feat: 定点回档物理备份changeMasterTEST3 TencentBlueKing#8338
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Dec 5, 2024
1 parent b648122 commit 2cac206
Showing 1 changed file with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ def rollback_data_flow(self):
backupinfo = get_local_backup(inst_list, cluster_class, mycluster["rollback_time"])
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
raise TendbGetBackupInfoFailedException(
message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
mycluster["backupinfo"] = copy.deepcopy(backupinfo)

sub_pipeline.add_sub_pipeline(
Expand Down Expand Up @@ -278,20 +279,48 @@ def rollback_to_cluster_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
# 为了回档的备份信息各个节点统一,获取备份的信息提前
backupinfo = copy.deepcopy(self.data["backupinfo"])
if self.data["rollback_type"] == RollbackType.LOCAL_AND_TIME:
# 可从主库获取,但需要排除掉被回档的机器
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_class.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
)
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
backupinfo = get_local_backup(inst_list, cluster_class, self.data["rollback_time"])
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(
message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))

elif self.data["rollback_type"] == RollbackType.REMOTE_AND_TIME.value:
rollback_handler = FixPointRollbackHandler(cluster_class.id)
backupinfo = rollback_handler.query_latest_backup_log(str2datetime(self.data["rollback_time"]))
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(["cluster_id"]))
raise TendbGetBackupInfoFailedException(
message=_("获取集群 {} 的备份信息失败".format(cluster_class.id))
)

sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
rollback_class = Cluster.objects.get(id=self.data["rollback_cluster_id"])
storages = rollback_class.storageinstance_set.all()
rollback_pipeline_list = []
for rollback_storage in storages:
if not check_storage_database(
rollback_class.bk_cloud_id, rollback_storage.machine.ip, rollback_storage.port
rollback_class.bk_cloud_id, rollback_storage.machine.ip, rollback_storage.port
):
logger.error("cluster {} check database fail".format(rollback_class.id))
raise NormalSpiderFlowException(
message=_("回档集群 {} 空闲检查不通过,请确认回档集群是否存在非系统数据库".format(rollback_class.id))
message=_(
"回档集群 {} 空闲检查不通过,请确认回档集群是否存在非系统数据库".format(rollback_class.id))
)
backupinfo = copy.deepcopy(self.data["backupinfo"])
mycluster = {
"name": cluster_class.name,
"cluster_id": cluster_class.id,
Expand Down Expand Up @@ -369,20 +398,6 @@ def rollback_to_cluster_flow(self):
)
# 本地备份+时间
if self.data["rollback_type"] == RollbackType.LOCAL_AND_TIME:
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_class.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[rollback_storage.machine.ip])
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
backupinfo = get_local_backup(inst_list, cluster_class, mycluster["rollback_time"])
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
mycluster["backupinfo"] = copy.deepcopy(backupinfo)
rollback_pipeline.add_sub_pipeline(
sub_flow=rollback_local_and_time(
Expand All @@ -392,16 +407,8 @@ def rollback_to_cluster_flow(self):
cluster_model=cluster_class,
)
)

# 远程备份+时间
elif self.data["rollback_type"] == RollbackType.REMOTE_AND_TIME.value:
rollback_handler = FixPointRollbackHandler(cluster_class.id)
backupinfo = rollback_handler.query_latest_backup_log(str2datetime(mycluster["rollback_time"]))
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(mycluster["cluster_id"]))
raise TendbGetBackupInfoFailedException(
message=_("获取集群 {} 的备份信息失败".format(mycluster["cluster_id"]))
)
mycluster["backupinfo"] = copy.deepcopy(backupinfo)
rollback_pipeline.add_sub_pipeline(
sub_flow=rollback_remote_and_time(
Expand Down

0 comments on commit 2cac206

Please sign in to comment.