Skip to content

Commit

Permalink
fix(mysql): 优化proxy替换流程 TencentBlueKing#8165
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu committed Nov 27, 2024
1 parent ec0f506 commit 2a884bf
Show file tree
Hide file tree
Showing 12 changed files with 603 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func (g *CloneClineGrantAct) Run() (err error) {
FunName: "克隆client权限",
Func: g.Service.CloneTargetClientPriv,
},
{
FunName: "回收旧client权限",
Func: g.Service.DropOriginClientPriv,
},
}

if err := steps.Run(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package pkg

import (
"context"
"dbm-services/common/go-pubpkg/logger"
"fmt"
"time"

"dbm-services/common/go-pubpkg/logger"

"github.com/jmoiron/sqlx"
)

Expand All @@ -18,7 +19,7 @@ func DropDB(conn *sqlx.Conn, dbName, to string, onlyStageTable bool) error {
return fmt.Errorf(`db "%s" is not trans clean`, dbName)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

_, err = conn.ExecContext(
Expand Down
4 changes: 0 additions & 4 deletions dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
def decommission(cluster: Cluster):
cc_manage = CcManage(cluster.bk_biz_id, cluster.cluster_type)
for proxy in cluster.proxyinstance_set.all():
# 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题
proxy.machine.proxyinstance_set.select_for_update().all()

proxy.delete(keep_parents=True)
if not proxy.machine.proxyinstance_set.exists():
Expand All @@ -41,8 +39,6 @@ def decommission(cluster: Cluster):
cc_manage.delete_service_instance(bk_instance_ids=[proxy.bk_instance_id])

for storage in cluster.storageinstance_set.all():
# 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题
storage.machine.proxyinstance_set.select_for_update().all()

# 删除存储在密码服务的密码元信息
DBPrivManagerApi.delete_password(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,27 @@
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import init_machine_sub_flow
from backend.flow.plugins.components.collections.mysql.clone_proxy_client_in_backend import (
CloneProxyUsersInBackendComponent,
)
from backend.flow.plugins.components.collections.mysql.clone_proxy_user_in_cluster import (
CloneProxyUsersInClusterComponent,
)
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.mysql.mysql_act_dataclass import (
CloneProxyClientInBackendKwargs,
CloneProxyUsersKwargs,
CreateDnsKwargs,
DBMetaOPKwargs,
DownloadMediaKwargs,
ExecActuatorKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.flow.utils.mysql.proxy_act_payload import ProxyActPayload

logger = logging.getLogger("flow")

Expand Down Expand Up @@ -106,9 +115,7 @@ def add_mysql_cluster_proxy_flow(self):
for i in self.data["infos"]:
cluster_ids.extend(i["cluster_ids"])

mysql_proxy_cluster_add_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)
mysql_proxy_cluster_add_pipeline = Builder(root_id=self.root_id, data=self.data)
sub_pipelines = []

# 多集群操作时循环加入集群proxy下架子流程
Expand Down Expand Up @@ -182,55 +189,42 @@ def add_mysql_cluster_proxy_flow(self):
# 针对集群维度声明子流程
add_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context))

# 拼接添加proxy节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
add_proxy_sub_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster["bk_cloud_id"],
cluster=cluster,
)

add_proxy_sub_pipeline.add_act(
act_name=_("下发db-actuator介质"),
act_component_code=TransFileComponent.code,
act_name=_("新的proxy配置后端实例[{}:{}]".format(info["proxy_ip"]["ip"], cluster["proxy_port"])),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
ExecActuatorKwargs(
bk_cloud_id=cluster["bk_cloud_id"],
exec_ip=[cluster["template_proxy_ip"]] + cluster["mysql_ip_list"],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
),
cluster=cluster,
exec_ip=info["proxy_ip"]["ip"],
get_mysql_payload_func=ProxyActPayload.get_set_proxy_backends.__name__,
)
),
)

add_proxy_sub_act_kwargs.exec_ip = cluster["target_proxy_ip"]
add_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_set_proxy_backends.__name__
add_proxy_sub_pipeline.add_act(
act_name=_("新的proxy配置后端实例"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(add_proxy_sub_act_kwargs),
act_name=_("克隆proxy用户白名单"),
act_component_code=CloneProxyUsersInClusterComponent.code,
kwargs=asdict(
CloneProxyUsersKwargs(
cluster_id=cluster["id"],
target_proxy_host=info["proxy_ip"]["ip"],
)
),
)

add_proxy_sub_act_kwargs.exec_ip = cluster["template_proxy_ip"]
add_proxy_sub_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_clone_proxy_user_payload.__name__
add_proxy_sub_pipeline.add_act(
act_name=_("克隆proxy用户白名单"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(add_proxy_sub_act_kwargs),
act_name=_("集群对新的proxy添加权限"),
act_component_code=CloneProxyUsersInBackendComponent.code,
kwargs=asdict(
CloneProxyClientInBackendKwargs(
cluster_id=cluster["id"],
target_proxy_host=info["proxy_ip"]["ip"],
origin_proxy_host=cluster["template_proxy_ip"],
)
),
)

acts_list = []
for cluster_mysql_ip in cluster["mysql_ip_list"]:
add_proxy_sub_act_kwargs.exec_ip = cluster_mysql_ip
add_proxy_sub_act_kwargs.get_mysql_payload_func = (
MysqlActPayload.get_clone_client_grant_payload.__name__
)
acts_list.append(
{
"act_name": _("集群对新的proxy添加权限"),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(add_proxy_sub_act_kwargs),
}
)
add_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list)

acts_list = []
for name in cluster["add_domain_list"]:
# 这里的添加域名的方式根据目前集群对应proxy dns域名进行循环添加,这样保证某个域名添加异常时其他域名添加成功
Expand All @@ -243,7 +237,7 @@ def add_mysql_cluster_proxy_flow(self):
bk_cloud_id=cluster["bk_cloud_id"],
add_domain_name=name,
dns_op_exec_port=cluster["proxy_port"],
exec_ip=cluster["target_proxy_ip"],
exec_ip=info["proxy_ip"]["ip"],
)
),
}
Expand Down Expand Up @@ -276,7 +270,9 @@ def add_mysql_cluster_proxy_flow(self):
kwargs=asdict(exec_act_kwargs),
)

sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("添加proxy子流程")))
sub_pipelines.append(
sub_pipeline.build_sub_process(sub_name=_("添加proxy子流程[{}]".format(info["proxy_ip"]["ip"])))
)

mysql_proxy_cluster_add_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
mysql_proxy_cluster_add_pipeline.run_pipeline(is_drop_random_user=True)
mysql_proxy_cluster_add_pipeline.run_pipeline()
Loading

0 comments on commit 2a884bf

Please sign in to comment.