Skip to content

Commit

Permalink
fix: 调整tendbha切换类流程,包括入参调整 #1539
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and zhangzhw8 committed Oct 30, 2023
1 parent 40b5b8f commit 104d953
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
from django.utils.crypto import get_random_string
from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterEntryType, InstanceInnerRole
from backend.db_meta.models import Cluster, ClusterEntry
from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER, InstanceStatus
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import check_sub_flow
from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent
from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
Expand Down Expand Up @@ -57,7 +59,6 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c
# 拼接子流程需要全局参数
switch_sub_flow_context = copy.deepcopy(ticket_data)
# 把公共参数拼接到子流程的全局只读上下文
switch_sub_flow_context["is_safe"] = True
switch_sub_flow_context["is_dead_master"] = False
switch_sub_flow_context["grant_repl"] = True
switch_sub_flow_context["locked_switch"] = True
Expand All @@ -68,6 +69,27 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c
# 针对集群维度声明子流程
cluster_switch_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(switch_sub_flow_context))

# 切换前做预检测, 克隆主从时客户端连接检测和checksum检验默认检测
sub_flow = check_sub_flow(
uid=ticket_data["uid"],
root_id=root_id,
cluster=cluster,
is_check_client_conn=True,
is_verify_checksum=True,
check_client_conn_inst=[
f"{cluster_info['old_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}",
f"{cluster_info['old_slave_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}",
],
verify_checksum_tuples=[
{
"master": f"{cluster_info['old_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}",
"slave": f"{cluster_info['new_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}",
}
],
)
if sub_flow:
cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow)

# todo ?授权切换账号
add_sw_user_kwargs = AddSwitchUserKwargs(
bk_cloud_id=cluster.bk_cloud_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType
from backend.db_meta.models import Cluster
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
check_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.mysql_master_slave_switch import MySQLMasterSlaveSwitchFlow
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent
Expand Down Expand Up @@ -91,15 +96,17 @@ def master_fail_over_flow(self):
sub_sub_flow_context.pop("infos")

# 把公共参数拼接到子流程的全局只读上下文
sub_sub_flow_context["is_safe"] = info["is_safe"]
sub_sub_flow_context["is_dead_master"] = True
sub_sub_flow_context["grant_repl"] = True
sub_sub_flow_context["locked_switch"] = False
sub_sub_flow_context["change_master_force"] = True

# 获取对应的集群信息
cluster = MySQLMasterSlaveSwitchFlow.get_cluster_info(
cluster_id=cluster_id, new_master_ip=info["slave_ip"]["ip"], old_master_ip=info["master_ip"]["ip"]
cluster_id=cluster_id,
bk_biz_id=sub_sub_flow_context["bk_biz_id"],
new_master_ip=info["slave_ip"]["ip"],
old_master_ip=info["master_ip"]["ip"],
)

# 拼接执行原子任务的活动节点需要的通用的私有参数
Expand All @@ -110,6 +117,24 @@ def master_fail_over_flow(self):
root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)
)

# 切换前做预检测, 强切场景理论上不需要对原来master做连接检测
sub_flow = check_sub_flow(
uid=self.data["uid"],
root_id=self.root_id,
cluster=Cluster.objects.get(id=cluster_id, bk_biz_id=sub_sub_flow_context["bk_biz_id"]),
is_check_client_conn=sub_sub_flow_context["is_check_process"],
is_verify_checksum=sub_sub_flow_context["is_verify_checksum"],
check_client_conn_inst=[f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}"],
verify_checksum_tuples=[
{
"master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
"slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
}
],
)
if sub_flow:
cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow)

# 阶段2 执行故障切换的原子任务
cluster_sw_kwargs.exec_ip = info["slave_ip"]["ip"]
cluster_sw_kwargs.get_mysql_payload_func = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance
from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
check_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent
from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent
Expand Down Expand Up @@ -70,14 +75,19 @@ def __init__(self, root_id: str, data: Optional[Dict]):
self.data = data

@staticmethod
def get_cluster_info(cluster_id: int, new_master_ip: str, old_master_ip: str) -> dict:
def get_cluster_info(cluster_id: int, bk_biz_id: int, new_master_ip: str, old_master_ip: str) -> dict:
"""
定义获取切换集群的基本信息的方法
@param cluster_id :集群id
@param bk_biz_id: 业务id
@param new_master_ip: 待升主的slave ip
@param old_master_ip: 目前的集群的master ip
"""
cluster = Cluster.objects.get(id=cluster_id)
try:
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id)
except Cluster.DoesNotExist:
raise ClusterNotExistException(cluster_id=cluster_id, bk_biz_id=bk_biz_id, message=_("集群不存在"))

proxy_info = ProxyInstance.objects.filter(cluster=cluster).all()
new_master = StorageInstance.objects.get(machine__ip=new_master_ip, cluster=cluster)

Expand Down Expand Up @@ -198,7 +208,6 @@ def master_slave_switch_flow(self):
sub_sub_flow_context.pop("infos")

# 把公共参数拼接到子流程的全局只读上下文
sub_sub_flow_context["is_safe"] = info["is_safe"]
sub_sub_flow_context["is_dead_master"] = False
sub_sub_flow_context["grant_repl"] = True
sub_sub_flow_context["locked_switch"] = True
Expand All @@ -208,7 +217,10 @@ def master_slave_switch_flow(self):

# 获取对应的集群信息
cluster = self.get_cluster_info(
cluster_id=cluster_id, new_master_ip=info["slave_ip"]["ip"], old_master_ip=info["master_ip"]["ip"]
cluster_id=cluster_id,
bk_biz_id=sub_sub_flow_context["bk_biz_id"],
new_master_ip=info["slave_ip"]["ip"],
old_master_ip=info["master_ip"]["ip"],
)

# 拼接切换执行活动节点需要的通用的私有参数
Expand All @@ -219,6 +231,27 @@ def master_slave_switch_flow(self):
root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)
)

# 切换前做预检测
sub_flow = check_sub_flow(
uid=self.data["uid"],
root_id=self.root_id,
cluster=Cluster.objects.get(id=cluster_id, bk_biz_id=sub_sub_flow_context["bk_biz_id"]),
is_check_client_conn=sub_sub_flow_context["is_check_process"],
is_verify_checksum=sub_sub_flow_context["is_verify_checksum"],
check_client_conn_inst=[
f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
],
verify_checksum_tuples=[
{
"master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
"slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
}
],
)
if sub_flow:
cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow)

# 阶段1 添加切换的临时账号
cluster_switch_sub_pipeline.add_act(
act_name=_("旧master添加切换临时账号"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.models import Cluster
from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
check_sub_flow,
)
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
Expand Down Expand Up @@ -410,7 +415,6 @@ def build_cluster_switch_sub_flow(self, cluster: dict):
switch_sub_flow_context.pop("infos")

# 把公共参数拼接到子流程的全局只读上下文
switch_sub_flow_context["is_safe"] = True
switch_sub_flow_context["is_dead_master"] = False
switch_sub_flow_context["grant_repl"] = True
switch_sub_flow_context["locked_switch"] = True
Expand All @@ -424,6 +428,27 @@ def build_cluster_switch_sub_flow(self, cluster: dict):
# 针对集群维度声明子流程
cluster_switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(switch_sub_flow_context))

# 切换前做预检测, 克隆主从时客户端连接检测和checksum检验默认检测
sub_flow = check_sub_flow(
uid=self.data["uid"],
root_id=self.root_id,
cluster=Cluster.objects.get(id=cluster["cluster_id"], bk_biz_id=cluster["bk_biz_id"]),
is_check_client_conn=True,
is_verify_checksum=True,
check_client_conn_inst=[
f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
f"{cluster['old_slave_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
],
verify_checksum_tuples=[
{
"master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
"slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}",
}
],
)
if sub_flow:
cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow)

add_sw_user_kwargs = AddSwitchUserKwargs(
bk_cloud_id=cluster["bk_cloud_id"],
user=switch_account,
Expand Down
Loading

0 comments on commit 104d953

Please sign in to comment.