Skip to content

Commit

Permalink
fix(mysql): 优化proxy替换流程 TencentBlueKing#8165
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu committed Nov 26, 2024
1 parent 2700a73 commit b149e81
Show file tree
Hide file tree
Showing 8 changed files with 521 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func (g *CloneClineGrantAct) Run() (err error) {
FunName: "克隆client权限",
Func: g.Service.CloneTargetClientPriv,
},
{
FunName: "回收旧client权限",
Func: g.Service.DropOriginClientPriv,
},
}

if err := steps.Run(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,40 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole, InstanceStatus
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole
from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import init_machine_sub_flow
from backend.flow.plugins.components.collections.common.delete_cc_service_instance import DelCCServiceInstComponent
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
from backend.flow.plugins.components.collections.mysql.clone_proxy_client_in_backend import (
CloneProxyUsersInBackendComponent,
)
from backend.flow.plugins.components.collections.mysql.clone_proxy_user_in_cluster import (
CloneProxyUsersInClusterComponent,
)
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.drop_proxy_client_in_backend import (
DropProxyUsersInBackendComponent,
)
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent
from backend.flow.plugins.components.collections.mysql.set_backend_in_porxy import SetBackendInProxyComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.mysql.mysql_act_dataclass import (
CloneProxyClientInBackendKwargs,
CloneProxyUsersKwargs,
CreateDnsKwargs,
DBMetaOPKwargs,
DelServiceInstKwargs,
DownloadMediaKwargs,
DropProxyUsersInBackendKwargs,
ExecActuatorKwargs,
RecycleDnsRecordKwargs,
SetBackendInProxyKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
Expand Down Expand Up @@ -67,33 +82,24 @@ def __get_switch_cluster_info(cluster_id: int, origin_proxy_ip: str, target_prox
"""
cluster = Cluster.objects.get(id=cluster_id)

# 选择集群标记running状态的proxy实例,作为流程中克隆权限的依据, 排除待替换的ip
template_proxy = (
ProxyInstance.objects.filter(cluster=cluster, status=InstanceStatus.RUNNING.value)
.exclude(machine__ip=origin_proxy_ip)
.all()[0]
)
mysql_ip_list = StorageInstance.objects.filter(cluster=cluster).all()
origin_proxy = ProxyInstance.objects.filter(cluster=cluster, machine__ip=origin_proxy_ip)
master = StorageInstance.objects.get(cluster=cluster, instance_inner_role=InstanceInnerRole.MASTER)
dns_list = template_proxy.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all()
dns_list = origin_proxy.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all()

return {
"id": cluster_id,
"bk_cloud_id": cluster.bk_cloud_id,
"name": cluster.name,
"cluster_type": cluster.cluster_type,
"template_proxy_ip": template_proxy.machine.ip,
# 集群所有的backend实例的端口是一致的,获取第一个对象的端口信息即可
"mysql_ip_list": [m.machine.ip for m in mysql_ip_list],
"mysql_port": master.port,
# 每套集群的proxy端口必须是相同的,取第一个proxy的端口信息即可
"proxy_port": template_proxy.port,
"proxy_port": origin_proxy.port,
"origin_proxy_ip": origin_proxy_ip,
"target_proxy_ip": target_proxy_ip,
# 新的proxy配置后端ip
"set_backend_ip": master.machine.ip,
"add_domain_list": [i.entry for i in dns_list],
"is_drop": True,
}

@staticmethod
Expand All @@ -113,15 +119,8 @@ def __get_proxy_install_ports(cluster_ids: list) -> list:
def switch_mysql_cluster_proxy_flow(self):
"""
定义mysql集群proxy替换实例流程
增加单据临时ADMIN账号的添加和删除逻辑
"""
cluster_ids = []
for i in self.data["infos"]:
cluster_ids.extend(i["cluster_ids"])

mysql_proxy_cluster_add_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)
mysql_proxy_cluster_add_pipeline = Builder(root_id=self.root_id, data=self.data)
sub_pipelines = []

# 多集群操作时循环加入集群proxy替换子流程
Expand All @@ -133,16 +132,6 @@ def switch_mysql_cluster_proxy_flow(self):
sub_flow_context["proxy_ports"] = self.__get_proxy_install_ports(cluster_ids=info["cluster_ids"])
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))

# 初始化同机替换的proxy集群信息
clusters = [
self.__get_switch_cluster_info(
cluster_id=cluster_id,
origin_proxy_ip=info["origin_proxy_ip"]["ip"],
target_proxy_ip=info["target_proxy_ip"]["ip"],
)
for cluster_id in info["cluster_ids"]
]

# 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
exec_act_kwargs = ExecActuatorKwargs(
cluster_type=ClusterType.TenDBHA,
Expand Down Expand Up @@ -190,89 +179,62 @@ def switch_mysql_cluster_proxy_flow(self):
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
# 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换
sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})

# 阶段2 根据需要替换的proxy的集群,依次添加
add_proxy_sub_list = []
for cluster in clusters:
switch_proxy_sub_list = []
for cluster_id in info["cluster_ids"]:

# 拼接子流程需要全局参数
sub_sub_flow_context = copy.deepcopy(self.data)
sub_sub_flow_context.pop("infos")

# 获取集群的实例信息
cluster = self.__get_switch_cluster_info(
cluster_id=cluster_id,
target_proxy_ip=info["target_proxy_ip"]["ip"],
origin_proxy_ip=info["origin_proxy_ip"]["ip"],
)

# 针对集群维度声明替换子流程
switch_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context))

# 拼接替换proxy节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
switch_proxy_sub_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster["bk_cloud_id"],
cluster=cluster,
)

switch_proxy_sub_pipeline.add_act(
act_name=_("下发db-actuator介质"),
act_component_code=TransFileComponent.code,
act_name=_("新的proxy配置后端实例[{}]".format(info["target_proxy_ip"]["ip"])),
act_component_code=SetBackendInProxyComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
SetBackendInProxyKwargs(
proxys=[f"{info['target_proxy_ip']['ip']}{IP_PORT_DIVIDER}{cluster['proxy_port']}"],
bk_cloud_id=cluster["bk_cloud_id"],
exec_ip=[cluster["template_proxy_ip"]] + cluster["mysql_ip_list"],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
),
backend_host=cluster["set_backend_ip"],
backend_port=cluster["mysql_port"],
)
),
)

switch_proxy_sub_act_kwargs.exec_ip = cluster["target_proxy_ip"]
switch_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_set_proxy_backends.__name__
switch_proxy_sub_pipeline.add_act(
act_name=_("新的proxy配置后端实例"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(switch_proxy_sub_act_kwargs),
)

switch_proxy_sub_act_kwargs.exec_ip = cluster["template_proxy_ip"]
switch_proxy_sub_act_kwargs.get_mysql_payload_func = (
MysqlActPayload.get_clone_proxy_user_payload.__name__
)
switch_proxy_sub_pipeline.add_act(
act_name=_("克隆proxy用户白名单"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(switch_proxy_sub_act_kwargs),
)

acts_list = []
for cluster_mysql_ip in cluster["mysql_ip_list"]:
switch_proxy_sub_act_kwargs.exec_ip = cluster_mysql_ip
switch_proxy_sub_act_kwargs.get_mysql_payload_func = (
MysqlActPayload.get_clone_client_grant_payload.__name__
)
acts_list.append(
{
"act_name": _("集群对新的proxy添加权限"),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(switch_proxy_sub_act_kwargs),
}
)
switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list)

add_proxy_sub_list.append(
switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群添加proxy实例").format(cluster["name"]))
act_component_code=CloneProxyUsersInClusterComponent.code,
kwargs=asdict(
CloneProxyUsersKwargs(
cluster_id=cluster["id"],
target_proxy_host=info["target_proxy_ip"]["ip"],
proxy_port=cluster["proxy_port"],
)
),
)

sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=add_proxy_sub_list)

# 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换
sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})

# 阶段3 根据集群维度切换域名
switch_dns_sub_list = []
for cluster in clusters:

# 拼接子流程需要全局参数
sub_sub_flow_context = copy.deepcopy(self.data)
sub_sub_flow_context.pop("infos")

# 针对集群维度声明替换子流程
switch_cluster_dns_pipeline = SubBuilder(
root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)
switch_proxy_sub_pipeline.add_act(
act_name=_("集群对新的proxy添加权限"),
act_component_code=CloneProxyUsersInBackendComponent.code,
kwargs=asdict(
CloneProxyClientInBackendKwargs(
cluster_id=cluster["id"],
target_proxy_host=info["target_proxy_ip"]["ip"],
origin_proxy_host=info["origin_proxy_ip"]["ip"],
)
),
)

acts_list = []
Expand All @@ -292,9 +254,9 @@ def switch_mysql_cluster_proxy_flow(self):
),
}
)
switch_cluster_dns_pipeline.add_parallel_acts(acts_list=acts_list)
switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list)

switch_cluster_dns_pipeline.add_act(
switch_proxy_sub_pipeline.add_act(
act_name=_("回收旧proxy集群映射"),
act_component_code=MySQLDnsManageComponent.code,
kwargs=asdict(
Expand All @@ -306,11 +268,22 @@ def switch_mysql_cluster_proxy_flow(self):
),
)

switch_dns_sub_list.append(
switch_cluster_dns_pipeline.build_sub_process(sub_name=_("{}集群切换proxy域名").format(cluster["name"]))
switch_proxy_sub_pipeline.add_act(
act_name=_("回收旧proxy在backend权限"),
act_component_code=DropProxyUsersInBackendComponent.code,
kwargs=asdict(
DropProxyUsersInBackendKwargs(
cluster_id=cluster["id"],
origin_proxy_host=info["origin_proxy_ip"]["ip"],
),
),
)

switch_proxy_sub_list.append(
switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群替换proxy实例").format(cluster["name"]))
)

sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_dns_sub_list)
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_proxy_sub_list)

# 先把新的节点数据写入
sub_pipeline.add_act(
Expand Down Expand Up @@ -373,7 +346,7 @@ def switch_mysql_cluster_proxy_flow(self):
sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("替换proxy子流程")))

mysql_proxy_cluster_add_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
mysql_proxy_cluster_add_pipeline.run_pipeline(is_drop_random_user=True)
mysql_proxy_cluster_add_pipeline.run_pipeline()

def proxy_reduce_sub_flow(self, cluster_id: int, bk_cloud_id: int, origin_proxy_ip: str, origin_proxy_port: int):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import logging

from django.utils.translation import ugettext as _
from pipeline.component_framework.component import Component

from backend.components import DRSApi
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.models import Cluster, StorageInstance
from backend.flow.plugins.components.collections.common.base_service import BaseService
from backend.flow.plugins.components.collections.mysql.drop_proxy_client_in_backend import (
DropProxyUsersInBackendService,
)
from backend.flow.utils.mysql.mysql_commom_query import show_privilege_for_user

logger = logging.getLogger("flow")


class CloneProxyUsersInBackendService(BaseService):
"""
在集群内,根据旧proxy权限,克隆一份对新proxy的权限。proxy替换和添加单据调用
操作步骤:
1: 先处理新proxy在集群所有backend节点的残留权限,避免冲突。因为理论上新proxy的授权出现在集群上
2:根据旧proxy的授权模式,给新proxy授权一份
"""

@staticmethod
def clone_proxy_client(origin_proxy_host: str, target_proxy_host: str, backend: StorageInstance, cluster: Cluster):
"""
克隆proxy权限
"""
result, grant_sqls = show_privilege_for_user(
host=origin_proxy_host, instance=backend, db_version=cluster.major_version
)
if not result:
return f"[{backend.ip_port}] show proxy client[{origin_proxy_host}] failed"

# 执行授权
res = DRSApi.rpc(
{
"addresses": [backend.ip_port],
"cmds": [i.replace(origin_proxy_host, target_proxy_host, -1) for i in grant_sqls],
"force": False,
"bk_cloud_id": backend.machine.bk_cloud_id,
}
)
if res["error_msg"]:
return f"[{backend.ip_port}] clone proxy client[{target_proxy_host}] failed: [{res['error_msg']}]"

return ""

def _execute(self, data, parent_data, callback=None) -> bool:
kwargs = data.get_one_of_inputs("kwargs")
global_data = data.get_one_of_inputs("global_data")
try:
cluster = Cluster.objects.get(id=kwargs["cluster_id"])
except Cluster.DoesNotExist:
raise ClusterNotExistException(
cluster_id=kwargs["cluster_id"], bk_biz_id=int(global_data["bk_biz_id"]), message=_("集群不存在")
)
err_no = False
for s in cluster.storageinstance_set.all():
# 1: 先处理新proxy在集群所有backend节点的残留权限
status, err = DropProxyUsersInBackendService.drop_proxy_client(kwargs["target_proxy_host"], s)
if not status:
self.log_error(err)
err_no = True
continue

# 2: 根据旧proxy的授权模式,给新proxy授权一份
log = self.clone_proxy_client(
origin_proxy_host=kwargs["origin_proxy_host"],
target_proxy_host=kwargs["target_proxy_host"],
backend=s,
cluster=cluster,
)
if log:
self.log_error(log)
err_no = True
continue

self.log_info(f"[{s.ip_port}]clone proxy client [{kwargs['target_proxy_host']}] successfully")

if err_no:
return False

return True


class CloneProxyUsersInBackendComponent(Component):
name = __name__
code = "clone_proxy_client_in_backend"
bound_service = CloneProxyUsersInBackendService
Loading

0 comments on commit b149e81

Please sign in to comment.