Skip to content

Commit

Permalink
123
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Sep 25, 2024
1 parent f475414 commit ec4773c
Showing 1 changed file with 44 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,14 @@ def non_standby_slaves_upgrade_subflow(
)
sub_pipeline.add_sub_pipeline(sub_flow=install_sub_pipeline)

# 恢复主从数据
local_backup = False
if backup_source == MySQLBackupSource.LOCAL:
# 使用本地备份来做迁移
sync_data_sub_pipeline_list = build_data_repl_from_local_backup(
root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset
)
sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list)
elif backup_source == MySQLBackupSource.REMOTE:
# 使用远程备份来做迁移
sync_data_sub_pipeline_list = build_sync_data_sub_pipelines(
root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip
)
sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list)
local_backup = True
sync_data_sub_pipeline_list = build_sync_data_sub_pipelines(
root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, local_backup
)
sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list)

# 切换到新从节点
if not add_slave_only:
Expand Down Expand Up @@ -269,10 +265,12 @@ def build_install_sub_pipeline(
kwargs=asdict(exec_act_kwargs),
)

return install_sub_pipeline.build_sub_process()
return install_sub_pipeline.build_sub_process(sub_name=_("{}:安装MySQL实例".format(new_slave_ip)))


def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip):
def build_sync_data_sub_pipelines(
root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, local_backup: bool
):
sync_data_sub_pipeline_list = []
for cluster_id in relation_cluster_ids:
cluster_model = Cluster.objects.get(id=cluster_id)
Expand All @@ -291,83 +289,49 @@ def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_
"change_master_force": True,
}
sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data))
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster_info
)
)
sync_data_sub_pipeline.add_act(
act_name=_("同步完毕,写入主从关系,设置节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__,
cluster=cluster_info,
is_update_trans_data=True,
if local_backup:
# 获取本地备份并恢复
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[new_slave_ip])
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=root_id,
ticket_data=copy.deepcopy(parent_global_data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
),
)
sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))
return sync_data_sub_pipeline_list


def build_data_repl_from_local_backup(
root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset
):
sync_data_sub_pipeline_list = []
for cluster_id in relation_cluster_ids:
cluster_model = Cluster.objects.get(id=cluster_id)
master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
cluster = {
"mysql_port": master.port,
"cluster_id": cluster_model.id,
"cluster_type": cluster.cluster_type,
"master_ip": master.machine.ip,
"master_port": master.port,
"new_slave_ip": new_slave_ip,
"new_slave_port": master.port,
"bk_cloud_id": cluster_model.bk_cloud_id,
"file_target_path": f"/data/dbbak/{root_id}/{master.port}",
"charset": charset,
"change_master_force": True,
"change_master": True,
}
sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data))
# 获取本地备份并恢复
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
non_stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=False,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[new_slave_ip])
if len(non_stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(non_stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, non_stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=root_id,
ticket_data=copy.deepcopy(parent_global_data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
else:
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster
)
)
)
# 恢复完毕的时候 slave 状态改为running

sync_data_sub_pipeline.add_act(
act_name=_("同步完毕,写入主从关系,设置节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__,
cluster=cluster,
cluster=cluster_info,
is_update_trans_data=True,
)
),
)

sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))
return sync_data_sub_pipeline_list
sync_data_sub_pipeline_list.append(
sync_data_sub_pipeline.build_sub_process(sub_name=_("{}:恢复实例数据".format(cluster_model.immute_domain)))
)
return sync_data_sub_pipeline_list


def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip):
Expand Down Expand Up @@ -402,7 +366,9 @@ def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids
)
),
)
switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点")))
switch_sub_pipeline_list.append(
switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点:{}".format(new_slave_ip)))
)
return switch_sub_pipeline_list


Expand Down

0 comments on commit ec4773c

Please sign in to comment.