diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/clone_client_grant.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/clone_client_grant.go index 8650471e96..73251d8474 100644 --- a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/clone_client_grant.go +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/clone_client_grant.go @@ -65,10 +65,6 @@ func (g *CloneClineGrantAct) Run() (err error) { FunName: "克隆client权限", Func: g.Service.CloneTargetClientPriv, }, - { - FunName: "回收旧client权限", - Func: g.Service.DropOriginClientPriv, - }, } if err := steps.Run(); err != nil { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/rename_dbs/pkg/drop_db.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/rename_dbs/pkg/drop_db.go index 0d01468133..4276cbd6ac 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/rename_dbs/pkg/drop_db.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/rename_dbs/pkg/drop_db.go @@ -2,10 +2,11 @@ package pkg import ( "context" - "dbm-services/common/go-pubpkg/logger" "fmt" "time" + "dbm-services/common/go-pubpkg/logger" + "github.com/jmoiron/sqlx" ) @@ -18,7 +19,7 @@ func DropDB(conn *sqlx.Conn, dbName, to string, onlyStageTable bool) error { return fmt.Errorf(`db "%s" is not trans clean`, dbName) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() _, err = conn.ExecContext( diff --git a/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py b/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py index f38eac4a2a..2e0eedc7b1 100644 --- a/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py +++ b/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py @@ -28,8 +28,6 @@ def decommission(cluster: Cluster): cc_manage = CcManage(cluster.bk_biz_id, cluster.cluster_type) for proxy in cluster.proxyinstance_set.all(): - # 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题 - proxy.machine.proxyinstance_set.select_for_update().all() proxy.delete(keep_parents=True) if not proxy.machine.proxyinstance_set.exists(): @@ -41,8 +39,6 @@ def decommission(cluster: Cluster): cc_manage.delete_service_instance(bk_instance_ids=[proxy.bk_instance_id]) for storage in cluster.storageinstance_set.all(): - # 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题 - storage.machine.proxyinstance_set.select_for_update().all() # 删除存储在密码服务的密码元信息 DBPrivManagerApi.delete_password( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_add.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_add.py index c6d7df0041..a11bc04181 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_add.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_add.py @@ -21,11 +21,19 @@ from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import init_machine_sub_flow +from backend.flow.plugins.components.collections.mysql.clone_proxy_client_in_backend import ( + CloneProxyUsersInBackendComponent, +) +from backend.flow.plugins.components.collections.mysql.clone_proxy_user_in_cluster import ( + CloneProxyUsersInClusterComponent, +) from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import ( + CloneProxyClientInBackendKwargs, + CloneProxyUsersKwargs, CreateDnsKwargs, DBMetaOPKwargs, DownloadMediaKwargs, @@ -33,6 +41,7 @@ ) from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.flow.utils.mysql.proxy_act_payload import ProxyActPayload logger = logging.getLogger("flow") @@ -106,9 +115,7 @@ def add_mysql_cluster_proxy_flow(self): for i in self.data["infos"]: cluster_ids.extend(i["cluster_ids"]) - mysql_proxy_cluster_add_pipeline = Builder( - root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids)) - ) + mysql_proxy_cluster_add_pipeline = Builder(root_id=self.root_id, data=self.data) sub_pipelines = [] # 多集群操作时循环加入集群proxy下架子流程 @@ -182,55 +189,42 @@ def add_mysql_cluster_proxy_flow(self): # 针对集群维度声明子流程 add_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)) - # 拼接添加proxy节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题 - add_proxy_sub_act_kwargs = ExecActuatorKwargs( - bk_cloud_id=cluster["bk_cloud_id"], - cluster=cluster, - ) - add_proxy_sub_pipeline.add_act( - act_name=_("下发db-actuator介质"), - act_component_code=TransFileComponent.code, + act_name=_("新的proxy配置后端实例[{}:{}]".format(info["proxy_ip"]["ip"], cluster["proxy_port"])), + act_component_code=ExecuteDBActuatorScriptComponent.code, kwargs=asdict( - DownloadMediaKwargs( + ExecActuatorKwargs( bk_cloud_id=cluster["bk_cloud_id"], - exec_ip=[cluster["template_proxy_ip"]] + cluster["mysql_ip_list"], - file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), - ), + cluster=cluster, + exec_ip=info["proxy_ip"]["ip"], + get_mysql_payload_func=ProxyActPayload.get_set_proxy_backends.__name__, + ) ), ) - add_proxy_sub_act_kwargs.exec_ip = cluster["target_proxy_ip"] - add_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_set_proxy_backends.__name__ add_proxy_sub_pipeline.add_act( - act_name=_("新的proxy配置后端实例"), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(add_proxy_sub_act_kwargs), + act_name=_("克隆proxy用户白名单"), + act_component_code=CloneProxyUsersInClusterComponent.code, + kwargs=asdict( + CloneProxyUsersKwargs( + cluster_id=cluster["id"], + target_proxy_host=info["proxy_ip"]["ip"], + ) + ), ) - add_proxy_sub_act_kwargs.exec_ip = cluster["template_proxy_ip"] - add_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_clone_proxy_user_payload.__name__ add_proxy_sub_pipeline.add_act( - act_name=_("克隆proxy用户白名单"), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(add_proxy_sub_act_kwargs), + act_name=_("集群对新的proxy添加权限"), + act_component_code=CloneProxyUsersInBackendComponent.code, + kwargs=asdict( + CloneProxyClientInBackendKwargs( + cluster_id=cluster["id"], + target_proxy_host=info["proxy_ip"]["ip"], + origin_proxy_host=cluster["template_proxy_ip"], + ) + ), ) - acts_list = [] - for cluster_mysql_ip in cluster["mysql_ip_list"]: - add_proxy_sub_act_kwargs.exec_ip = cluster_mysql_ip - add_proxy_sub_act_kwargs.get_mysql_payload_func = ( - MysqlActPayload.get_clone_client_grant_payload.__name__ - ) - acts_list.append( - { - "act_name": _("集群对新的proxy添加权限"), - "act_component_code": ExecuteDBActuatorScriptComponent.code, - "kwargs": asdict(add_proxy_sub_act_kwargs), - } - ) - add_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list) - acts_list = [] for name in cluster["add_domain_list"]: # 这里的添加域名的方式根据目前集群对应proxy dns域名进行循环添加,这样保证某个域名添加异常时其他域名添加成功 @@ -243,7 +237,7 @@ def add_mysql_cluster_proxy_flow(self): bk_cloud_id=cluster["bk_cloud_id"], add_domain_name=name, dns_op_exec_port=cluster["proxy_port"], - exec_ip=cluster["target_proxy_ip"], + exec_ip=info["proxy_ip"]["ip"], ) ), } @@ -276,7 +270,9 @@ def add_mysql_cluster_proxy_flow(self): kwargs=asdict(exec_act_kwargs), ) - sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("添加proxy子流程"))) + sub_pipelines.append( + sub_pipeline.build_sub_process(sub_name=_("添加proxy子流程[{}]".format(info["proxy_ip"]["ip"]))) + ) mysql_proxy_cluster_add_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) - mysql_proxy_cluster_add_pipeline.run_pipeline(is_drop_random_user=True) + mysql_proxy_cluster_add_pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py index 021bae366a..15849286c3 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py @@ -16,7 +16,7 @@ from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType -from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole, InstanceStatus +from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList @@ -24,20 +24,33 @@ from backend.flow.plugins.components.collections.common.delete_cc_service_instance import DelCCServiceInstComponent from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent +from backend.flow.plugins.components.collections.mysql.clone_proxy_client_in_backend import ( + CloneProxyUsersInBackendComponent, +) +from backend.flow.plugins.components.collections.mysql.clone_proxy_user_in_cluster import ( + CloneProxyUsersInClusterComponent, +) from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent +from backend.flow.plugins.components.collections.mysql.drop_proxy_client_in_backend import ( + DropProxyUsersInBackendComponent, +) from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import ( + CloneProxyClientInBackendKwargs, + CloneProxyUsersKwargs, CreateDnsKwargs, DBMetaOPKwargs, DelServiceInstKwargs, DownloadMediaKwargs, + DropProxyUsersInBackendKwargs, ExecActuatorKwargs, RecycleDnsRecordKwargs, ) from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.flow.utils.mysql.proxy_act_payload import ProxyActPayload logger = logging.getLogger("flow") @@ -67,33 +80,25 @@ def __get_switch_cluster_info(cluster_id: int, origin_proxy_ip: str, target_prox """ cluster = Cluster.objects.get(id=cluster_id) - # 选择集群标记running状态的proxy实例,作为流程中克隆权限的依据, 排除待替换的ip - template_proxy = ( - ProxyInstance.objects.filter(cluster=cluster, status=InstanceStatus.RUNNING.value) - .exclude(machine__ip=origin_proxy_ip) - .all()[0] - ) - mysql_ip_list = StorageInstance.objects.filter(cluster=cluster).all() + origin_proxy = ProxyInstance.objects.get(cluster=cluster, machine__ip=origin_proxy_ip) master = StorageInstance.objects.get(cluster=cluster, instance_inner_role=InstanceInnerRole.MASTER) - dns_list = template_proxy.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all() + dns_list = origin_proxy.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all() return { "id": cluster_id, "bk_cloud_id": cluster.bk_cloud_id, "name": cluster.name, "cluster_type": cluster.cluster_type, - "template_proxy_ip": template_proxy.machine.ip, # 集群所有的backend实例的端口是一致的,获取第一个对象的端口信息即可 - "mysql_ip_list": [m.machine.ip for m in mysql_ip_list], "mysql_port": master.port, # 每套集群的proxy端口必须是相同的,取第一个proxy的端口信息即可 - "proxy_port": template_proxy.port, + "proxy_port": origin_proxy.port, + "proxy_admin_port": origin_proxy.admin_port, "origin_proxy_ip": origin_proxy_ip, "target_proxy_ip": target_proxy_ip, # 新的proxy配置后端ip "set_backend_ip": master.machine.ip, "add_domain_list": [i.entry for i in dns_list], - "is_drop": True, } @staticmethod @@ -113,15 +118,8 @@ def __get_proxy_install_ports(cluster_ids: list) -> list: def switch_mysql_cluster_proxy_flow(self): """ 定义mysql集群proxy替换实例流程 - 增加单据临时ADMIN账号的添加和删除逻辑 """ - cluster_ids = [] - for i in self.data["infos"]: - cluster_ids.extend(i["cluster_ids"]) - - mysql_proxy_cluster_add_pipeline = Builder( - root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids)) - ) + mysql_proxy_cluster_add_pipeline = Builder(root_id=self.root_id, data=self.data) sub_pipelines = [] # 多集群操作时循环加入集群proxy替换子流程 @@ -133,16 +131,6 @@ def switch_mysql_cluster_proxy_flow(self): sub_flow_context["proxy_ports"] = self.__get_proxy_install_ports(cluster_ids=info["cluster_ids"]) sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context)) - # 初始化同机替换的proxy集群信息 - clusters = [ - self.__get_switch_cluster_info( - cluster_id=cluster_id, - origin_proxy_ip=info["origin_proxy_ip"]["ip"], - target_proxy_ip=info["target_proxy_ip"]["ip"], - ) - for cluster_id in info["cluster_ids"] - ] - # 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题 exec_act_kwargs = ExecActuatorKwargs( cluster_type=ClusterType.TenDBHA, @@ -190,89 +178,61 @@ def switch_mysql_cluster_proxy_flow(self): act_component_code=ExecuteDBActuatorScriptComponent.code, kwargs=asdict(exec_act_kwargs), ) + # 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换 + sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={}) # 阶段2 根据需要替换的proxy的集群,依次添加 - add_proxy_sub_list = [] - for cluster in clusters: + switch_proxy_sub_list = [] + for cluster_id in info["cluster_ids"]: # 拼接子流程需要全局参数 sub_sub_flow_context = copy.deepcopy(self.data) sub_sub_flow_context.pop("infos") + # 获取集群的实例信息 + cluster = self.__get_switch_cluster_info( + cluster_id=cluster_id, + target_proxy_ip=info["target_proxy_ip"]["ip"], + origin_proxy_ip=info["origin_proxy_ip"]["ip"], + ) + # 针对集群维度声明替换子流程 switch_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)) - # 拼接替换proxy节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题 - switch_proxy_sub_act_kwargs = ExecActuatorKwargs( - bk_cloud_id=cluster["bk_cloud_id"], - cluster=cluster, - ) - switch_proxy_sub_pipeline.add_act( - act_name=_("下发db-actuator介质"), - act_component_code=TransFileComponent.code, + act_name=_("新的proxy配置后端实例[{}:{}]".format(info["target_proxy_ip"]["ip"], cluster["proxy_port"])), + act_component_code=ExecuteDBActuatorScriptComponent.code, kwargs=asdict( - DownloadMediaKwargs( + ExecActuatorKwargs( bk_cloud_id=cluster["bk_cloud_id"], - exec_ip=[cluster["template_proxy_ip"]] + cluster["mysql_ip_list"], - file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), - ), + cluster=cluster, + exec_ip=info["target_proxy_ip"]["ip"], + get_mysql_payload_func=ProxyActPayload.get_set_proxy_backends.__name__, + ) ), ) - switch_proxy_sub_act_kwargs.exec_ip = cluster["target_proxy_ip"] - switch_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_set_proxy_backends.__name__ - switch_proxy_sub_pipeline.add_act( - act_name=_("新的proxy配置后端实例"), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(switch_proxy_sub_act_kwargs), - ) - - switch_proxy_sub_act_kwargs.exec_ip = cluster["template_proxy_ip"] - switch_proxy_sub_act_kwargs.get_mysql_payload_func = ( - MysqlActPayload.get_clone_proxy_user_payload.__name__ - ) switch_proxy_sub_pipeline.add_act( act_name=_("克隆proxy用户白名单"), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(switch_proxy_sub_act_kwargs), - ) - - acts_list = [] - for cluster_mysql_ip in cluster["mysql_ip_list"]: - switch_proxy_sub_act_kwargs.exec_ip = cluster_mysql_ip - switch_proxy_sub_act_kwargs.get_mysql_payload_func = ( - MysqlActPayload.get_clone_client_grant_payload.__name__ - ) - acts_list.append( - { - "act_name": _("集群对新的proxy添加权限"), - "act_component_code": ExecuteDBActuatorScriptComponent.code, - "kwargs": asdict(switch_proxy_sub_act_kwargs), - } - ) - switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list) - - add_proxy_sub_list.append( - switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群添加proxy实例").format(cluster["name"])) + act_component_code=CloneProxyUsersInClusterComponent.code, + kwargs=asdict( + CloneProxyUsersKwargs( + cluster_id=cluster["id"], + target_proxy_host=info["target_proxy_ip"]["ip"], + ) + ), ) - sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=add_proxy_sub_list) - - # 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换 - sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={}) - - # 阶段3 根据集群维度切换域名 - switch_dns_sub_list = [] - for cluster in clusters: - - # 拼接子流程需要全局参数 - sub_sub_flow_context = copy.deepcopy(self.data) - sub_sub_flow_context.pop("infos") - - # 针对集群维度声明替换子流程 - switch_cluster_dns_pipeline = SubBuilder( - root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context) + switch_proxy_sub_pipeline.add_act( + act_name=_("集群对新的proxy添加权限"), + act_component_code=CloneProxyUsersInBackendComponent.code, + kwargs=asdict( + CloneProxyClientInBackendKwargs( + cluster_id=cluster["id"], + target_proxy_host=info["target_proxy_ip"]["ip"], + origin_proxy_host=info["origin_proxy_ip"]["ip"], + ) + ), ) acts_list = [] @@ -287,30 +247,41 @@ def switch_mysql_cluster_proxy_flow(self): bk_cloud_id=cluster["bk_cloud_id"], add_domain_name=dns_name, dns_op_exec_port=cluster["proxy_port"], - exec_ip=cluster["target_proxy_ip"], + exec_ip=info["target_proxy_ip"]["ip"], ) ), } ) - switch_cluster_dns_pipeline.add_parallel_acts(acts_list=acts_list) + switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list) - switch_cluster_dns_pipeline.add_act( + switch_proxy_sub_pipeline.add_act( act_name=_("回收旧proxy集群映射"), act_component_code=MySQLDnsManageComponent.code, kwargs=asdict( RecycleDnsRecordKwargs( bk_cloud_id=cluster["bk_cloud_id"], dns_op_exec_port=cluster["proxy_port"], - exec_ip=cluster["origin_proxy_ip"], + exec_ip=info["origin_proxy_ip"]["ip"], + ), + ), + ) + + switch_proxy_sub_pipeline.add_act( + act_name=_("回收旧proxy在backend权限"), + act_component_code=DropProxyUsersInBackendComponent.code, + kwargs=asdict( + DropProxyUsersInBackendKwargs( + cluster_id=cluster["id"], + origin_proxy_host=info["origin_proxy_ip"]["ip"], ), ), ) - switch_dns_sub_list.append( - switch_cluster_dns_pipeline.build_sub_process(sub_name=_("{}集群切换proxy域名").format(cluster["name"])) + switch_proxy_sub_list.append( + switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群替换proxy实例").format(cluster["name"])) ) - sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_dns_sub_list) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_proxy_sub_list) # 先把新的节点数据写入 sub_pipeline.add_act( @@ -370,10 +341,16 @@ def switch_mysql_cluster_proxy_flow(self): kwargs=asdict(exec_act_kwargs), ) - sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("替换proxy子流程"))) + sub_pipelines.append( + sub_pipeline.build_sub_process( + sub_name=_( + "替换proxy子流程[{}]->[{}]".format(info["origin_proxy_ip"]["ip"], info["target_proxy_ip"]["ip"]) + ) + ) + ) mysql_proxy_cluster_add_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) - mysql_proxy_cluster_add_pipeline.run_pipeline(is_drop_random_user=True) + mysql_proxy_cluster_add_pipeline.run_pipeline() def proxy_reduce_sub_flow(self, cluster_id: int, bk_cloud_id: int, origin_proxy_ip: str, origin_proxy_port: int): """ diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/clear_machine.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/clear_machine.py index 23101edb0e..a1d513c65c 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mysql/clear_machine.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/clear_machine.py @@ -42,10 +42,21 @@ def _execute(self, data, parent_data) -> bool: # 检测机器列表是否还有实例注册 target_ip_list = copy.deepcopy(exec_ips) for ip in exec_ips: - if Machine.objects.filter(ip=ip, bk_cloud_id=kwargs["bk_cloud_id"]).exists(): - self.log_info(_("机器还在系统中注册,暂不用清理[{}]").format(ip)) - target_ip_list.remove(ip) - continue + machines = Machine.objects.filter(ip=ip, bk_cloud_id=kwargs["bk_cloud_id"]).prefetch_related( + "proxyinstance_set", "storageinstance_set" + ) + if machines.exists(): + # ip+bk_cloud_id是唯一值,如果存在只有一行数据 + if machines[0].proxyinstance_set.exists() or machines[0].storageinstance_set.exists(): + # 如果注册到Instance上,才算有空余的注册内容,怎么跳过 + self.log_info(_("机器还在系统中注册,暂不用清理[{}]").format(ip)) + target_ip_list.remove(ip) + continue + else: + # 如果只有machine表注册,可以认为脏数据,可能并发下架引起的,这里可以做一次清理 + self.log_info(_("机器还在machine表存在残留数据,执行machine数据清理[{}]").format(ip)) + machines[0].delete(keep_parents=True) + if not target_ip_list: # 表示没有机器可以清理回收 self.log_info(_("本次操作没有机器可以清理,提前结束活动节点")) diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_client_in_backend.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_client_in_backend.py new file mode 100644 index 0000000000..ce3777d848 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_client_in_backend.py @@ -0,0 +1,108 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from django.utils.translation import ugettext as _ +from pipeline.component_framework.component import Component + +from backend.components import DRSApi +from backend.db_meta.exceptions import ClusterNotExistException +from backend.db_meta.models import Cluster, StorageInstance +from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.plugins.components.collections.mysql.drop_proxy_client_in_backend import ( + DropProxyUsersInBackendService, +) +from backend.flow.utils.mysql.mysql_commom_query import show_privilege_for_user + +logger = logging.getLogger("flow") + + +class CloneProxyUsersInBackendService(BaseService): + """ + 在集群内,根据旧proxy权限,克隆一份对新proxy的权限。proxy替换和添加单据调用 + 操作步骤: + 1: 先处理新proxy在集群所有backend节点的残留权限,避免冲突。因为理论上新proxy的授权出现在集群上 + 2:根据旧proxy的授权模式,给新proxy授权一份 + """ + + def clone_proxy_client( + self, origin_proxy_host: str, target_proxy_host: str, backend: StorageInstance, cluster: Cluster + ): + """ + 克隆proxy权限 + """ + result, grant_sqls = show_privilege_for_user( + host=origin_proxy_host, instance=backend, db_version=cluster.major_version + ) + if not result: + return f"[{backend.ip_port}] show proxy client[{origin_proxy_host}] failed" + + if not grant_sqls: + self.log_info(f"[{backend.ip_port}] show proxy client[{origin_proxy_host}] is null, skip") + return "" + + # 执行授权 + res = DRSApi.rpc( + { + "addresses": [backend.ip_port], + "cmds": [i.replace(origin_proxy_host, target_proxy_host, -1) for i in grant_sqls], + "force": False, + "bk_cloud_id": backend.machine.bk_cloud_id, + } + ) + if res[0]["error_msg"]: + return f"[{backend.ip_port}] clone proxy client[{target_proxy_host}] failed: [{res['error_msg']}]" + + return "" + + def _execute(self, data, parent_data, callback=None) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + global_data = data.get_one_of_inputs("global_data") + try: + cluster = Cluster.objects.get(id=kwargs["cluster_id"]) + except Cluster.DoesNotExist: + raise ClusterNotExistException( + cluster_id=kwargs["cluster_id"], bk_biz_id=int(global_data["bk_biz_id"]), message=_("集群不存在") + ) + err_no = False + for s in cluster.storageinstance_set.all(): + # 1: 先处理新proxy在集群所有backend节点的残留权限 + status, err = DropProxyUsersInBackendService.drop_proxy_client(kwargs["target_proxy_host"], s) + if not status: + self.log_error(err) + err_no = True + continue + self.log_info(f"[{s.ip_port}] drop new proxy client[{kwargs['target_proxy_host']}] successfully") + + # 2: 根据旧proxy的授权模式,给新proxy授权一份 + log = self.clone_proxy_client( + origin_proxy_host=kwargs["origin_proxy_host"], + target_proxy_host=kwargs["target_proxy_host"], + backend=s, + cluster=cluster, + ) + if log: + self.log_error(log) + err_no = True + continue + + self.log_info(f"[{s.ip_port}]clone proxy client [{kwargs['target_proxy_host']}] successfully") + + if err_no: + return False + + return True + + +class CloneProxyUsersInBackendComponent(Component): + name = __name__ + code = "clone_proxy_client_in_backend" + bound_service = CloneProxyUsersInBackendService diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_user_in_cluster.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_user_in_cluster.py new file mode 100644 index 0000000000..165e49a519 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/clone_proxy_user_in_cluster.py @@ -0,0 +1,87 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from django.utils.translation import ugettext as _ +from pipeline.component_framework.component import Component + +from backend.components import DRSApi +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import InstanceStatus, MachineType +from backend.db_meta.exceptions import ClusterNotExistException +from backend.db_meta.models import Cluster +from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserService + +logger = logging.getLogger("flow") + + +class CloneProxyUsersInClusterService(CloneUserService): + """ + 场景化处理:集群内克隆proxy的用户白名单,同时给后端mysql对白名单授权,提供proxy替换和添加使用 + 理论上某个状态点,集群的所有proxy的授权名单都是同等的。 + 所以这里会即时计算running状态的proxy实例作为权限克隆源,保证克隆时集群的权限的最新可用的。 + """ + + def _calc_running_status_in_cluster(self, cluster: Cluster): + """ + 计算集群可用的proxy实例,作为权限克隆源 + """ + proxys = cluster.proxyinstance_set.filter(status=InstanceStatus.RUNNING) + if not proxys: + # 如果在dbm系统找不到running状态的proxy实例,则报异常 + self.log_error(f"no running-status-proxys in cluster[{cluster.immute_domain}]") + return None, 0 + for proxy in proxys: + proxy_admin_instance = f"{proxy.machine.ip}{IP_PORT_DIVIDER}{proxy.admin_port}" + res = DRSApi.proxyrpc( + { + "addresses": [proxy_admin_instance], + "cmds": ["select version;"], + "force": False, + "bk_cloud_id": cluster.bk_cloud_id, + } + ) + if not res[0]["error_msg"]: + self.log_info(f"get running proxy [{proxy_admin_instance}] is source ") + return proxy.ip_port, proxy.port + + self.log_error(f"no running proxy in cluster [{cluster.immute_domain}] with drs-check") + return None, 0 + + def _execute(self, data, parent_data, callback=None) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + global_data = data.get_one_of_inputs("global_data") + try: + cluster = Cluster.objects.get(id=kwargs["cluster_id"]) + except Cluster.DoesNotExist: + raise ClusterNotExistException( + cluster_id=kwargs["cluster_id"], bk_biz_id=int(global_data["bk_biz_id"]), message=_("集群不存在") + ) + temp_proxy, proxy_port = self._calc_running_status_in_cluster(cluster) + if not temp_proxy: + return False + + # 执行clone-user接口 + data.get_one_of_inputs("kwargs")["clone_data"] = [ + { + "source": temp_proxy, + "target": f"{kwargs['target_proxy_host']}{IP_PORT_DIVIDER}{proxy_port}", + "machine_type": MachineType.PROXY.value, + "bk_cloud_id": cluster.bk_cloud_id, + } + ] + return super()._execute(data, parent_data) + + +class CloneProxyUsersInClusterComponent(Component): + name = __name__ + code = "clone_proxy_users_in_cluster" + bound_service = CloneProxyUsersInClusterService diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/drop_proxy_client_in_backend.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/drop_proxy_client_in_backend.py new file mode 100644 index 0000000000..7e5d8a75e8 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/drop_proxy_client_in_backend.py @@ -0,0 +1,77 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from django.utils.translation import ugettext as _ +from pipeline.component_framework.component import Component + +from backend.components import DRSApi +from backend.db_meta.exceptions import ClusterNotExistException +from backend.db_meta.models import Cluster, StorageInstance +from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.utils.mysql.mysql_commom_query import show_user_host_for_host + +logger = logging.getLogger("flow") + + +class DropProxyUsersInBackendService(BaseService): + """ + 在集群内清理旧proxy的后端权限 + """ + + @staticmethod + def drop_proxy_client(origin_proxy_host: str, backend: StorageInstance): + """ + 计算集群可用的proxy实例,作为权限克隆源 + """ + result, user_hosts = show_user_host_for_host(host=origin_proxy_host, instance=backend) + if not result: + return False, f"[{backend.ip_port}] get user_host[{origin_proxy_host}] failed" + + # 执行删除旧proxy client + if user_hosts: + res = DRSApi.rpc( + { + "addresses": [backend.ip_port], + "cmds": [f"drop user {i};" for i in user_hosts], + "force": False, + "bk_cloud_id": backend.machine.bk_cloud_id, + } + ) + if res[0]["error_msg"]: + return ( + False, + f"[{backend.ip_port}] drop old proxy client[{origin_proxy_host}] failed: [{res['error_msg']}]", + ) + return True, "" + + def _execute(self, data, parent_data, callback=None) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + global_data = data.get_one_of_inputs("global_data") + try: + cluster = Cluster.objects.get(id=kwargs["cluster_id"]) + except Cluster.DoesNotExist: + raise ClusterNotExistException( + cluster_id=kwargs["cluster_id"], bk_biz_id=int(global_data["bk_biz_id"]), message=_("集群不存在") + ) + for s in cluster.storageinstance_set.all(): + status, err = self.drop_proxy_client(kwargs["origin_proxy_host"], s) + if not status: + self.log_error(err) + return False + self.log_info(f"[{s.ip_port}]drop old proxy client [{kwargs['origin_proxy_host']}] successfully") + return True + + +class DropProxyUsersInBackendComponent(Component): + name = __name__ + code = "drop_proxy_users_in_backend" + bound_service = DropProxyUsersInBackendService diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/set_backend_in_porxy.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/set_backend_in_porxy.py new file mode 100644 index 0000000000..2ef1c02775 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/set_backend_in_porxy.py @@ -0,0 +1,59 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from pipeline.component_framework.component import Component + +from backend.components import DRSApi +from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.utils.mysql.mysql_commom_query import check_backend_in_proxy + +logger = logging.getLogger("flow") + + +class SetBackendInProxyService(BaseService): + """ + 在新proxy设置backend后端信息,设置之前需要保证proxy的backend是1.1.1.1:3306 + 如果不是则证明不是最新的,则作为异常退出 + """ + + def _execute(self, data, parent_data, callback=None) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + if not check_backend_in_proxy(proxys=kwargs["proxys"], bk_cloud_id=int(kwargs["bk_cloud_id"])): + # 检测不通过,异常 + return False + + # 刷新backend + res = DRSApi.proxyrpc( + { + "addresses": kwargs["proxys"], + "cmds": [f"refresh_backends('{kwargs['backend_host']}:{kwargs['backend_port']}',1)"], + "force": False, + "bk_cloud_id": int(kwargs["bk_cloud_id"]), + } + ) + is_error = False + for i in res: + if i["error_msg"]: + self.log_error(f"the proxy [{kwargs['proxys']}] set backend failed:{i['error_msg']}") + is_error = True + + if is_error: + return False + + self.log_info(f"the proxy [{kwargs['proxys']}] set backend successfully") + return True + + +class SetBackendInProxyComponent(Component): + name = __name__ + code = "set_backend_in_proxy" + bound_service = SetBackendInProxyService diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py index 597ea18491..5f59d8c769 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py @@ -554,3 +554,46 @@ class AuthorizeKwargs: operator: str authorize_data: list user_db_rules_map: dict + + +@dataclass +class SetBackendInProxyKwargs: + """ + 定义set_backend_in_proxy活动节点的私有变量结构体 + """ + + proxys: List[str] + bk_cloud_id: int + backend_host: str + backend_port: int + + +@dataclass +class CloneProxyUsersKwargs: + """ + 定义clone_proxy_users_in_cluster活动节点的私有变量结构体 + """ + + cluster_id: int + target_proxy_host: str + + +@dataclass +class CloneProxyClientInBackendKwargs: + """ + 定义clone_proxy_client_in_backend活动节点的私有变量结构体 + """ + + cluster_id: int + target_proxy_host: str + origin_proxy_host: str + + +@dataclass +class DropProxyUsersInBackendKwargs: + """ + 定义drop_proxy_users_in_backend活动节点的私有变量结构体 + """ + + cluster_id: int + origin_proxy_host: str diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py index 355935d48a..0d8bdd37db 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py @@ -9,11 +9,14 @@ specific language governing permissions and limitations under the License. """ import logging.config +from typing import List from django.utils.translation import gettext as _ from backend.components.db_remote_service.client import DRSApi from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.models import StorageInstance +from backend.flow.utils.mysql.mysql_version_parse import mysql_version_parse logger = logging.getLogger("flow") @@ -44,3 +47,92 @@ def query_mysql_variables(host: str, port: int, bk_cloud_id: int): val = var_item["Value"] var_map[var_name] = val return var_map + + +def show_user_host_for_host(host: str, instance: StorageInstance): + """ + 根据host查询账号信息 + """ + res = DRSApi.rpc( + { + "addresses": [instance.ip_port], + "cmds": [f"select concat('`',user,'`@`',host,'`') as user_host from mysql.user where host = '{host}'"], + "force": False, + "bk_cloud_id": instance.machine.bk_cloud_id, + } + ) + if res[0]["error_msg"]: + logger.error(f"[{instance.ip_port}] get user info [{host}] failed: [{res['error_msg']}]") + return False, [] + + return True, [list(item.values())[0] for item in res[0]["cmd_results"][0]["table_data"]] + + +def show_privilege_for_user(db_version: str, host: str, instance: StorageInstance): + """ + 根据user_host 在实例查询授权情况,并拼接成对应的版本的授权语句 + """ + result, user_hosts = show_user_host_for_host(host=host, instance=instance) + if not result: + # 这里是异常退出 + return result, [] + if not user_hosts: + # 这里查询为空则正常退出 + return True, [] + + grants_sql = [] + if mysql_version_parse(db_version) >= mysql_version_parse("5.7"): + res = DRSApi.rpc( + { + "addresses": [instance.ip_port], + "cmds": [f"show create user {u} " for u in user_hosts], + "force": False, + "bk_cloud_id": instance.machine.bk_cloud_id, + } + ) + if res[0]["error_msg"]: + logger.error(f"[{instance.ip_port}] show create user failed: [{res[0]['error_msg']}]") + return False, [] + grants_sql.extend([list(i.values())[0] for item in res[0]["cmd_results"] for i in item["table_data"]]) + + res = DRSApi.rpc( + { + "addresses": [instance.ip_port], + "cmds": [f"show grants for {u} " for u in user_hosts], + "force": False, + "bk_cloud_id": instance.machine.bk_cloud_id, + } + ) + if res[0]["error_msg"]: + logger.error(f"[{instance.ip_port}] show grants failed: [{res[0]['error_msg']}]") + return False, [] + + grants_sql.extend([list(i.values())[0] for item in res[0]["cmd_results"] for i in item["table_data"]]) + return True, grants_sql + + +def check_backend_in_proxy(proxys: List[str], bk_cloud_id: int): + """ + 检测传入的proxy是否1.1.1.1:3306 + """ + res = DRSApi.proxyrpc( + { + "addresses": proxys, + "cmds": ["SELECT * FROM backends;"], + "force": False, + "bk_cloud_id": bk_cloud_id, + } + ) + for i in res: + if i["error_msg"]: + logger.error(f"get proxy backends failed: [{i['error_msg']}]") + return False + + is_pass = True + for i in res[0]["cmd_results"]: + backend_address = str(i["table_data"][0]["address"]).strip() + if backend_address != "1.1.1.1:3306": + logger.error(f"[{res[0]['address']}] the backends is not empty [{backend_address}] ") + is_pass = False + + return is_pass