From af3f5011d811e5a8824d9eb48b7c2ccbc1df1c57 Mon Sep 17 00:00:00 2001 From: yyhenryyy Date: Wed, 11 Dec 2024 14:28:16 +0800 Subject: [PATCH] =?UTF-8?q?fix(mongodb):=20=E4=BC=98=E5=8C=96=E5=89=AF?= =?UTF-8?q?=E6=9C=AC=E9=9B=86=E8=8E=B7=E5=8F=96primary=E5=88=9B=E5=BB=BADB?= =?UTF-8?q?A=E7=94=A8=E6=88=B7=EF=BC=8C=E7=BC=A9=E5=AE=B9shard=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=95=B0=E6=8F=90=E4=BE=9B=E8=8E=B7=E5=8F=96=E4=B8=8B?= =?UTF-8?q?=E6=9E=B6=E6=9C=BA=E5=99=A8=E4=BF=A1=E6=81=AF=E5=87=BD=E6=95=B0?= =?UTF-8?q?=20#8515?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/atomjobs/atommongodb/add_user.go | 12 +- .../dbactuator/pkg/common/mongo_common.go | 29 ++++ .../mongodb/shard_reduce_node_get_host.py | 152 ++++++++++++++++++ 3 files changed, 191 insertions(+), 2 deletions(-) create mode 100644 dbm-ui/backend/flow/utils/mongodb/shard_reduce_node_get_host.py diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/add_user.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/add_user.go index 9509df385d..5c5add9c8b 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/add_user.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/add_user.go @@ -105,8 +105,16 @@ func (u *AddUser) Init(runtime *jobruntime.JobGenericRuntime) error { info, err = common.AuthGetPrimaryInfo(u.Mongo, u.ConfParams.AdminUsername, u.ConfParams.AdminPassword, u.ConfParams.IP, u.ConfParams.Port) if err != nil { - u.runtime.Logger.Error(fmt.Sprintf( - "get primary db info of addUser fail, error:%s", err)) + u.runtime.Logger.Error("get primary db info of addUser fail, error:%s", err) + return fmt.Errorf("get primary db info of addUser fail, error:%s", err) + } + getInfo := strings.Split(info, ":") + u.PrimaryIP = getInfo[0] + u.PrimaryPort, _ = strconv.Atoi(getInfo[1]) + } else if u.ConfParams.AdminUsername == "" && u.ConfParams.AdminPassword == "" { + info, err = common.CreateDBAUserGetPrimaryInfo(u.Mongo, u.ConfParams.Port) + if err != nil { + u.runtime.Logger.Error("get primary db info of addUser fail, error:%s", err) return fmt.Errorf("get primary db info of addUser fail, error:%s", err) } getInfo := strings.Split(info, ":") diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/common/mongo_common.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/common/mongo_common.go index 43a89c8db8..bac5e0363b 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/common/mongo_common.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/common/mongo_common.go @@ -257,6 +257,35 @@ func AuthGetPrimaryInfo(mongoBin string, username string, password string, ip st } } +// CreateDBAUserGetPrimaryInfo 创建dba用户获取primary节点信息 +func CreateDBAUserGetPrimaryInfo(mongoBin string, port int) (string, + error) { + // 超时时间 + timeout := time.After(20 * time.Second) + for { + select { + case <-timeout: + return "", fmt.Errorf("get primary info timeout") + default: + cmd := fmt.Sprintf( + "%s --host %s --port %d --quiet --eval \"rs.isMaster().primary\"", mongoBin, "127.0.0.1", port) + result, err := util.RunBashCmd( + cmd, + "", nil, + 60*time.Second) + if err != nil { + return "", err + } + if strings.Replace(result, "\n", "", -1) == "" { + time.Sleep(1 * time.Second) + continue + } + primaryInfo := strings.Replace(result, "\n", "", -1) + return primaryInfo, nil + } + } +} + // NoAuthGetPrimaryInfo 获取primary节点信息 func NoAuthGetPrimaryInfo(mongoBin string, ip string, port int) (string, error) { // 超时时间 diff --git a/dbm-ui/backend/flow/utils/mongodb/shard_reduce_node_get_host.py b/dbm-ui/backend/flow/utils/mongodb/shard_reduce_node_get_host.py new file mode 100644 index 0000000000..5e281de1ab --- /dev/null +++ b/dbm-ui/backend/flow/utils/mongodb/shard_reduce_node_get_host.py @@ -0,0 +1,152 @@ +from backend.db_meta.enums import MachineType +from backend.db_meta.enums.cluster_type import ClusterType +from backend.db_meta.enums.instance_role import InstanceRole +from backend.db_meta.enums.type_maps import MachineTypeInstanceRoleMap +from backend.db_meta.models import Machine +from backend.flow.utils.mongodb.mongodb_repo import MongoRepository + + +def get_cluster_instance_info(cluster_id: int) -> dict: + """获取集群实例信息""" + + cluster_instance_info = {} + cluster_info = MongoRepository().fetch_one_cluster(withDomain=True, id=cluster_id) + cluster_instance_info["bk_cloud_id"] = cluster_info.bk_cloud_id + nodes = [] + if cluster_info.cluster_type == ClusterType.MongoReplicaSet.value: + backup_node = {} + for member in cluster_info.get_shards()[0].members: + if member.role == InstanceRole.MONGO_BACKUP.value: + backup_node = { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "domain": member.domain, + "instance_role": member.role, + } + continue + nodes.append( + { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "domain": member.domain, + "instance_role": member.role, + } + ) + nodes.append(backup_node) + cluster_instance_info["nodes"] = nodes + elif cluster_info.cluster_type == ClusterType.MongoShardedCluster.value: + mongos = cluster_info.get_mongos() + shards = cluster_info.get_shards() + config = cluster_info.get_config() + mongos_nodes = [] + shards_nodes = [] + config_nodes = [] + for mongo in mongos: + mongos_nodes.append( + {"ip": mongo.ip, "port": int(mongo.port), "bk_cloud_id": mongo.bk_cloud_id, "domain": mongo.domain} + ) + for shard in shards: + shard_info = {"shard": shard.set_name} + nodes = [] + backup_node = {} + for member in shard.members: + if member.role == InstanceRole.MONGO_BACKUP.value: + backup_node = { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "instance_role": member.role, + } + continue + nodes.append( + { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "instance_role": member.role, + } + ) + nodes.append(backup_node) + shard_info["nodes"] = nodes + shards_nodes.append(shard_info) + backup_node = {} + for member in config.members: + if member.role == InstanceRole.MONGO_BACKUP.value: + backup_node = { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "instance_role": member.role, + } + continue + config_nodes.append( + { + "ip": member.ip, + "port": int(member.port), + "bk_cloud_id": member.bk_cloud_id, + "instance_role": member.role, + } + ) + config_nodes.append(backup_node) + cluster_instance_info["mongos_nodes"] = mongos_nodes + cluster_instance_info["shards_nodes"] = shards_nodes + cluster_instance_info["config_nodes"] = config_nodes + return cluster_instance_info + + +def get_hosts_reduce_node(ticket_data: dict) -> list: + """缩容shard节点数获取下架机器""" + + # 实例角色信息 + instance_role = MachineTypeInstanceRoleMap[MachineType.MONGODB] + # 获取下架机器 + replicaset_hosts = [] + cluster_hosts = [] + hosts = [] + for replicaset_info in ticket_data["infos"][ClusterType.MongoReplicaSet.value]: + reduce_shard_nodes = replicaset_info["reduce_shard_nodes"] + replicaset_hosts_set = set() + bk_cloud_id = "" + for cluster_id in replicaset_info["cluster_ids"]: + cluster_instance_info = get_cluster_instance_info(cluster_id=cluster_id) + current_node_num = len(cluster_instance_info["nodes"]) + for index in range(reduce_shard_nodes): + role = instance_role[current_node_num - 2 - index] + for node in cluster_instance_info["nodes"]: + if node["instance_role"] == role: + replicaset_hosts_set.add(node["ip"]) + bk_cloud_id = node["bk_cloud_id"] + break + for ip in replicaset_hosts_set: + replicaset_hosts.append({"ip": ip, "bk_cloud_id": bk_cloud_id}) + for cluster_info in ticket_data["infos"][ClusterType.MongoShardedCluster.value]: + cluster_hosts_set = set() + bk_cloud_id = "" + reduce_shard_nodes = cluster_info["reduce_shard_nodes"] + cluster_instance_info = get_cluster_instance_info(cluster_id=cluster_info["cluster_id"]) + # 所有shard的实例对应关系 + shards_instance_relationships = {} + for shard in cluster_instance_info["shards_nodes"]: + shards_instance_relationships[shard["shard"]] = [] + for shard in cluster_instance_info["shards_nodes"]: + current_node_num = len(shard["nodes"]) + for index in range(reduce_shard_nodes): + role = instance_role[current_node_num - 2 - index] + for node in shard["nodes"]: + if node["instance_role"] == role: + bk_cloud_id = node["bk_cloud_id"] + cluster_hosts_set.add(node["ip"]) + for ip in cluster_hosts_set: + cluster_hosts.append({"ip": ip, "bk_cloud_id": bk_cloud_id}) + for host in replicaset_hosts + cluster_hosts: + machine = Machine.objects.get(ip=host["ip"], bk_cloud_id=host["bk_cloud_id"]) + hosts.append( + { + "ip": host["ip"], + "bk_host_id": machine.bk_host_id, + "bk_cloud_id": host["bk_cloud_id"], + } + ) + return hosts