Skip to content

Commit

Permalink
fix(mongodb): 优化副本集获取primary创建DBA用户,缩容shard节点数提供获取下架机器信息函数 #8515
Browse files Browse the repository at this point in the history
  • Loading branch information
yyhenryyy committed Dec 11, 2024
1 parent 9c36b45 commit 151e07f
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,16 @@ func (u *AddUser) Init(runtime *jobruntime.JobGenericRuntime) error {
info, err = common.AuthGetPrimaryInfo(u.Mongo, u.ConfParams.AdminUsername,
u.ConfParams.AdminPassword, u.ConfParams.IP, u.ConfParams.Port)
if err != nil {
u.runtime.Logger.Error(fmt.Sprintf(
"get primary db info of addUser fail, error:%s", err))
u.runtime.Logger.Error("get primary db info of addUser fail, error:%s", err)
return fmt.Errorf("get primary db info of addUser fail, error:%s", err)
}
getInfo := strings.Split(info, ":")
u.PrimaryIP = getInfo[0]
u.PrimaryPort, _ = strconv.Atoi(getInfo[1])
} else if u.ConfParams.AdminUsername == "" && u.ConfParams.AdminPassword == "" {
info, err = common.CreateDBAUserGetPrimaryInfo(u.Mongo, u.ConfParams.Port)
if err != nil {
u.runtime.Logger.Error("get primary db info of addUser fail, error:%s", err)
return fmt.Errorf("get primary db info of addUser fail, error:%s", err)
}
getInfo := strings.Split(info, ":")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ func AuthGetPrimaryInfo(mongoBin string, username string, password string, ip st
}
}

// CreateDBAUserGetPrimaryInfo 创建dba用户获取primary节点信息
func CreateDBAUserGetPrimaryInfo(mongoBin string, port int) (string,
error) {
// 超时时间
timeout := time.After(20 * time.Second)
for {
select {
case <-timeout:
return "", fmt.Errorf("get primary info timeout")
default:
cmd := fmt.Sprintf(
"%s --host %s --port %d --quiet --eval \"rs.isMaster().primary\"", mongoBin, "127.0.0.1", port)
result, err := util.RunBashCmd(
cmd,
"", nil,
60*time.Second)
if err != nil {
return "", err
}
if strings.Replace(result, "\n", "", -1) == "" {
time.Sleep(1 * time.Second)
continue
}
primaryInfo := strings.Replace(result, "\n", "", -1)
return primaryInfo, nil
}
}
}

// NoAuthGetPrimaryInfo 获取primary节点信息
func NoAuthGetPrimaryInfo(mongoBin string, ip string, port int) (string, error) {
// 超时时间
Expand Down
152 changes: 152 additions & 0 deletions dbm-ui/backend/flow/utils/mongodb/shard_reduce_node_get_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from backend.db_meta.enums import MachineType
from backend.db_meta.enums.cluster_type import ClusterType
from backend.db_meta.enums.instance_role import InstanceRole
from backend.db_meta.enums.type_maps import MachineTypeInstanceRoleMap
from backend.db_meta.models import Machine
from backend.flow.utils.mongodb.mongodb_repo import MongoRepository


def get_cluster_instance_info(cluster_id: int) -> dict:
"""获取集群实例信息"""

cluster_instance_info = {}
cluster_info = MongoRepository().fetch_one_cluster(withDomain=True, id=cluster_id)
cluster_instance_info["bk_cloud_id"] = cluster_info.bk_cloud_id
nodes = []
if cluster_info.cluster_type == ClusterType.MongoReplicaSet.value:
backup_node = {}
for member in cluster_info.get_shards()[0].members:
if member.role == InstanceRole.MONGO_BACKUP.value:
backup_node = {
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"domain": member.domain,
"instance_role": member.role,
}
continue
nodes.append(
{
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"domain": member.domain,
"instance_role": member.role,
}
)
nodes.append(backup_node)
cluster_instance_info["nodes"] = nodes
elif cluster_info.cluster_type == ClusterType.MongoShardedCluster.value:
mongos = cluster_info.get_mongos()
shards = cluster_info.get_shards()
config = cluster_info.get_config()
mongos_nodes = []
shards_nodes = []
config_nodes = []
for mongo in mongos:
mongos_nodes.append(
{"ip": mongo.ip, "port": int(mongo.port), "bk_cloud_id": mongo.bk_cloud_id, "domain": mongo.domain}
)
for shard in shards:
shard_info = {"shard": shard.set_name}
nodes = []
backup_node = {}
for member in shard.members:
if member.role == InstanceRole.MONGO_BACKUP.value:
backup_node = {
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"instance_role": member.role,
}
continue
nodes.append(
{
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"instance_role": member.role,
}
)
nodes.append(backup_node)
shard_info["nodes"] = nodes
shards_nodes.append(shard_info)
backup_node = {}
for member in config.members:
if member.role == InstanceRole.MONGO_BACKUP.value:
backup_node = {
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"instance_role": member.role,
}
continue
config_nodes.append(
{
"ip": member.ip,
"port": int(member.port),
"bk_cloud_id": member.bk_cloud_id,
"instance_role": member.role,
}
)
config_nodes.append(backup_node)
cluster_instance_info["mongos_nodes"] = mongos_nodes
cluster_instance_info["shards_nodes"] = shards_nodes
cluster_instance_info["config_nodes"] = config_nodes
return cluster_instance_info


def get_hosts_reduce_node(ticket_data: dict) -> list:
"""缩容shard节点数获取下架机器"""

# 实例角色信息
instance_role = MachineTypeInstanceRoleMap[MachineType.MONGODB]
# 获取下架机器
replicaset_hosts = []
cluster_hosts = []
hosts = []
for replicaset_info in ticket_data["infos"][ClusterType.MongoReplicaSet.value]:
reduce_shard_nodes = replicaset_info["reduce_shard_nodes"]
replicaset_hosts_set = set()
bk_cloud_id = ""
for cluster_id in replicaset_info["cluster_ids"]:
cluster_instance_info = get_cluster_instance_info(cluster_id=cluster_id)
current_node_num = len(cluster_instance_info["nodes"])
for index in range(reduce_shard_nodes):
role = instance_role[current_node_num - 2 - index]
for node in cluster_instance_info["nodes"]:
if node["instance_role"] == role:
replicaset_hosts_set.add(node["ip"])
bk_cloud_id = node["bk_cloud_id"]
break
for ip in replicaset_hosts_set:
replicaset_hosts.append({"ip": ip, "bk_cloud_id": bk_cloud_id})
for cluster_info in ticket_data["infos"][ClusterType.MongoShardedCluster.value]:
cluster_hosts_set = set()
bk_cloud_id = ""
reduce_shard_nodes = cluster_info["reduce_shard_nodes"]
cluster_instance_info = get_cluster_instance_info(cluster_id=cluster_info["cluster_id"])
# 所有shard的实例对应关系
shards_instance_relationships = {}
for shard in cluster_instance_info["shards_nodes"]:
shards_instance_relationships[shard["shard"]] = []
for shard in cluster_instance_info["shards_nodes"]:
current_node_num = len(shard["nodes"])
for index in range(reduce_shard_nodes):
role = instance_role[current_node_num - 2 - index]
for node in shard["nodes"]:
if node["instance_role"] == role:
bk_cloud_id = node["bk_cloud_id"]
cluster_hosts_set.add(node["ip"])
for ip in cluster_hosts_set:
cluster_hosts.append({"ip": ip, "bk_cloud_id": bk_cloud_id})
for host in replicaset_hosts + cluster_hosts:
machine = Machine.objects.get(ip=host["ip"], bk_cloud_id=host["bk_cloud_id"])
hosts.append(
{
"ip": host["ip"],
"bk_host_id": machine.bk_host_id,
"bk_cloud_id": host["bk_cloud_id"],
}
)
return hosts

0 comments on commit 151e07f

Please sign in to comment.