Skip to content

Commit

Permalink
feat(mongodb): 增加/减少shard节点数 #3619
Browse files Browse the repository at this point in the history
  • Loading branch information
yyhenryyy authored and zhangzhw8 committed Apr 8, 2024
1 parent 339c32b commit 242a430
Show file tree
Hide file tree
Showing 16 changed files with 1,149 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,21 @@ func (r *MongoDReplace) Init(runtime *jobruntime.JobGenericRuntime) error {
r.StatusCh = make(chan int, 1)

// 获取源端的配置信息
_, _, _, hidden, priority, _, err := common.GetNodeInfo(r.Mongo, r.PrimaryIP, r.PrimaryPort,
r.ConfParams.AdminUsername, r.ConfParams.AdminPassword, r.ConfParams.SourceIP, r.ConfParams.SourcePort)
if err != nil {
return err
if r.ConfParams.SourceIP != "" {
_, _, _, hidden, priority, _, err := common.GetNodeInfo(r.Mongo, r.PrimaryIP, r.PrimaryPort,
r.ConfParams.AdminUsername, r.ConfParams.AdminPassword, r.ConfParams.SourceIP, r.ConfParams.SourcePort)
if err != nil {
return err
}
r.TargetPriority = priority
r.TargetHidden = hidden
}
r.TargetHidden = hidden
if r.ConfParams.TargetHidden == "0" {
r.TargetHidden = false
} else if r.ConfParams.TargetHidden == "1" {
r.TargetHidden = true
}

r.TargetPriority = priority
if r.ConfParams.TargetPriority != "" {
r.TargetPriority, _ = strconv.Atoi(r.ConfParams.TargetPriority)
}
Expand Down Expand Up @@ -359,7 +361,7 @@ func (r *MongoDReplace) checkTargetStatusAndRemoveSource() error {
case <-time.After(50 * time.Second):
return fmt.Errorf("check target status timeout")
case status := <-r.StatusCh:
if status == 2 && r.ConfParams.SourceDown == false {
if status == 2 && r.ConfParams.SourceDown == false && r.ConfParams.SourceIP != "" {
if err := r.shutdownSourceProcess(); err != nil {
return err
}
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging.config
from typing import Dict, Optional

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.engine.bamboo.scene.common.builder import Builder
from backend.flow.engine.bamboo.scene.mongodb.sub_task import (
cluster_increase_node,
cluster_reduce_node,
replicaset_reduce_node,
replicaset_set_increase_node,
)
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs

logger = logging.getLogger("flow")


class MongoScaleNodeFlow(object):
"""MongoDB变更shard节点数flow"""

def __init__(self, root_id: str, data: Optional[Dict]):
"""
传入参数
@param root_id : 任务流程定义的root_id
@param data : 单据传递过来的参数列表,是dict格式
"""

self.root_id = root_id
self.data = data
self.get_kwargs = ActKwargs()
self.get_kwargs.payload = data
self.get_kwargs.get_file_path()

def multi_cluster_scale_node_flow(self, increase: bool):
"""
multi cluster scale node流程
"""

# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.data)

sub_pipelines = []
# 复制集增减节点——子流程并行
if self.data["infos"][ClusterType.MongoReplicaSet.value]:
for replicaset_set in self.data["infos"][ClusterType.MongoReplicaSet.value]:
if increase:
sub_pipline = replicaset_set_increase_node(
root_id=self.root_id,
ticket_data=self.data,
sub_kwargs=self.get_kwargs,
info=replicaset_set,
)
sub_pipelines.append(sub_pipline)
else:
sub_pipline = replicaset_reduce_node(
root_id=self.root_id,
ticket_data=self.data,
sub_kwargs=self.get_kwargs,
info=replicaset_set,
cluster=False,
)
sub_pipelines.append(sub_pipline)
# cluster的shard增减节点——子流程并行
if self.data["infos"][ClusterType.MongoShardedCluster.value]:
for cluster in self.data["infos"][ClusterType.MongoShardedCluster.value]:
if increase:
sub_pipline = cluster_increase_node(
root_id=self.root_id, ticket_data=self.data, sub_kwargs=self.get_kwargs, info=cluster
)
else:
sub_pipline = cluster_reduce_node(
root_id=self.root_id, ticket_data=self.data, sub_kwargs=self.get_kwargs, info=cluster
)
sub_pipelines.append(sub_pipline)

pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

# 运行流程
pipeline.run_pipeline()
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,24 @@
"""


from .cluster_increase_node import cluster_increase_node
from .cluster_reduce_node import cluster_reduce_node
from .cluster_replace import cluster_replace
from .cluster_scale import cluster_scale
from .deinstall import deinstall
from .exec_script import exec_script
from .increase_mongod import increase_mongod
from .increase_mongos import increase_mongos
from .instance_restart import instance_restart
from .mongod_replace import mongod_replace
from .mongos_install import mongos_install
from .mongos_replace import mongos_replace
from .reduce_mongod import reduce_mongod
from .reduce_mongos import reduce_mongos
from .replicaset_install import replicaset_install
from .replicaset_reduce_node import replicaset_reduce_node
from .replicaset_replace import replicaset_replace
from .replicaset_scale import replicaset_scale
from .replicaset_set_increase_node import replicaset_set_increase_node
from .replicaset_set_increase_node_by_ip import replicaset_set_increase_node_by_ip
from .user import user
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from copy import deepcopy
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.plugins.components.collections.mongodb.exec_actuator_job import ExecuteDBActuatorJobComponent
from backend.flow.plugins.components.collections.mongodb.send_media import ExecSendMediaOperationComponent
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs

from .replicaset_set_increase_node_by_ip import replicaset_set_increase_node_by_ip


def cluster_increase_node(root_id: str, ticket_data: Optional[Dict], sub_kwargs: ActKwargs, info: dict) -> SubBuilder:
"""
cluster增加节点流程
info 表示同机器多cluster信息
"""

# 获取变量
sub_get_kwargs = deepcopy(sub_kwargs)

# 创建子流程
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)

# 设置变量
sub_get_kwargs.payload["app"] = sub_get_kwargs.payload["bk_app_abbr"]
sub_get_kwargs.payload["cluster_type"] = ClusterType.MongoShardedCluster.value
info["target"] = info["add_shard_nodes"][0]

# 计算cacheSize oplogSize
sub_get_kwargs.calc_param_replace(info=info, instance_num=info["node_replica_count"])

# 获取主机信息
sub_get_kwargs.get_host_increase_node(info=info)

# 介质下发
kwargs = sub_get_kwargs.get_send_media_kwargs(media_type="all")
sub_pipeline.add_act(
act_name=_("MongoDB-介质下发"), act_component_code=ExecSendMediaOperationComponent.code, kwargs=kwargs
)

# 创建原子任务执行目录
kwargs = sub_get_kwargs.get_create_dir_kwargs()
sub_pipeline.add_act(
act_name=_("MongoDB-创建原子任务执行目录"), act_component_code=ExecuteDBActuatorJobComponent.code, kwargs=kwargs
)

# 机器初始化
kwargs = sub_get_kwargs.get_os_init_kwargs()
sub_pipeline.add_act(
act_name=_("MongoDB-机器初始化"), act_component_code=ExecuteDBActuatorJobComponent.code, kwargs=kwargs
)

# 获取集群信息并以IP为维度计算对应关系
sub_get_kwargs.calc_increase_node(info=info)

# 获取mongos信息
mongos_host = sub_get_kwargs.payload["mongos_nodes"][0]
sub_get_kwargs.payload["nodes"] = [
{"ip": mongos_host["ip"], "port": mongos_host["port"], "bk_cloud_id": mongos_host["bk_cloud_id"]}
]

# 获取密码
get_password = {}
get_password["usernames"] = sub_get_kwargs.manager_users
sub_get_kwargs.payload["passwords"] = sub_get_kwargs.get_password_from_db(info=get_password)["passwords"]

# 获取key_file
sub_get_kwargs.cluster_type = sub_get_kwargs.payload["cluster_type"]
sub_get_kwargs.payload["key_file"] = sub_get_kwargs.get_key_file(
cluster_name=sub_get_kwargs.payload["cluster_name"]
)

# 以IP为维度增加node——子流程并行
sub_pipelines = []
for add_shard_node, shards_instance_relationship in sub_get_kwargs.payload[
"shards_instance_relationships_by_ip"
].items():
sub_get_kwargs.payload["add_shard_node"] = add_shard_node
sub_sub_pipeline = replicaset_set_increase_node_by_ip(
root_id=root_id,
ticket_data=ticket_data,
sub_kwargs=sub_get_kwargs,
info=shards_instance_relationship,
cluster=True,
)
sub_pipelines.append(sub_sub_pipeline)
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

return sub_pipeline.build_sub_process(sub_name=_("MongoDB--{}增加节点".format(sub_get_kwargs.payload["cluster_name"])))
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from copy import deepcopy
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.plugins.components.collections.mongodb.exec_actuator_job import ExecuteDBActuatorJobComponent
from backend.flow.plugins.components.collections.mongodb.send_media import ExecSendMediaOperationComponent
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs

from .replicaset_reduce_node import replicaset_reduce_node


def cluster_reduce_node(root_id: str, ticket_data: Optional[Dict], sub_kwargs: ActKwargs, info: dict) -> SubBuilder:
"""
cluster减少节点流程
info 表示同机器多cluster信息
"""

# 获取变量
sub_get_kwargs = deepcopy(sub_kwargs)

# 创建子流程
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)

# 设置变量
sub_get_kwargs.payload["app"] = sub_get_kwargs.payload["bk_app_abbr"]
sub_get_kwargs.payload["cluster_type"] = ClusterType.MongoShardedCluster.value

# 获取集群信息并计算对应关系
sub_get_kwargs.calc_reduce_node(info=info)

# 介质下发
kwargs = sub_get_kwargs.get_send_media_kwargs(media_type="actuator")
sub_pipeline.add_act(
act_name=_("MongoDB-介质下发"), act_component_code=ExecSendMediaOperationComponent.code, kwargs=kwargs
)

# 创建原子任务执行目录
kwargs = sub_get_kwargs.get_create_dir_kwargs()
sub_pipeline.add_act(
act_name=_("MongoDB-创建原子任务执行目录"), act_component_code=ExecuteDBActuatorJobComponent.code, kwargs=kwargs
)

# 获取mongos信息
mongos_host = sub_get_kwargs.payload["mongos_nodes"][0]
sub_get_kwargs.payload["nodes"] = [
{"ip": mongos_host["ip"], "port": mongos_host["port"], "bk_cloud_id": mongos_host["bk_cloud_id"]}
]

# 获取密码
get_password = {}
get_password["usernames"] = sub_get_kwargs.manager_users
sub_get_kwargs.payload["passwords"] = sub_get_kwargs.get_password_from_db(info=get_password)["passwords"]

# shard进行减少node——子流程并行
sub_pipelines = []
for db_instances in sub_get_kwargs.payload["shards_instance_relationships"].values():
sub_get_kwargs.payload["db_instances"] = db_instances
sub_sub_pipeline = replicaset_reduce_node(
root_id=root_id,
ticket_data=ticket_data,
sub_kwargs=sub_get_kwargs,
info={},
cluster=True,
)
sub_pipelines.append(sub_sub_pipeline)
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

return sub_pipeline.build_sub_process(sub_name=_("MongoDB--{}增加节点".format(sub_get_kwargs.payload["cluster_name"])))
Loading

0 comments on commit 242a430

Please sign in to comment.