diff --git a/.gitignore b/.gitignore index 918a538bab..f6520ed87c 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ pre-*-bkcodeai bkcodeai.json package-lock.json + +### PreCI ### +.codecc \ No newline at end of file diff --git a/dbm-ui/backend/db_meta/enums/cluster_type.py b/dbm-ui/backend/db_meta/enums/cluster_type.py index 8d2378d3d7..ee2e5c106f 100644 --- a/dbm-ui/backend/db_meta/enums/cluster_type.py +++ b/dbm-ui/backend/db_meta/enums/cluster_type.py @@ -54,7 +54,6 @@ class ClusterType(str, StructuredEnum): MongoShardedCluster = EnumField("MongoShardedCluster", _("Mongo分片集群")) Riak = EnumField("riak", _("Riak集群")) - SqlserverSingle = EnumField("sqlserver_single", _("sqlserver单节点版")) SqlserverHA = EnumField("sqlserver_ha", _("sqlserver主从版")) diff --git a/dbm-ui/backend/db_meta/enums/machine_type.py b/dbm-ui/backend/db_meta/enums/machine_type.py index a3932ed694..20c4be90ad 100644 --- a/dbm-ui/backend/db_meta/enums/machine_type.py +++ b/dbm-ui/backend/db_meta/enums/machine_type.py @@ -60,6 +60,12 @@ class MachineType(str, StructuredEnum): VM_INSERT = EnumField("vminsert", _("vminsert")) VM_SELECT = EnumField("vmselect", _("vmselect")) VM_AUTH = EnumField("vmauth", _("vmauth")) - # 仅用于TBinlogDumper实例的管控 TBinlogDumper = EnumField("tbinlogdumper", _("TBinlogDumper")) + + +class MongoSetType(str, StructuredEnum): + Mongos = EnumField("mongos", _("mongos")) + ShardSvr = EnumField("shardsvr", _("shardsvr")) + Configsvr = EnumField("configsvr", _("configsvr")) + Replicaset = EnumField("replicaset", _("replicaset")) diff --git a/dbm-ui/backend/db_services/mongodb/restore/handlers.py b/dbm-ui/backend/db_services/mongodb/restore/handlers.py index 0bc7e30696..6b8ed7f3c1 100644 --- a/dbm-ui/backend/db_services/mongodb/restore/handlers.py +++ b/dbm-ui/backend/db_services/mongodb/restore/handlers.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import logging from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Dict, List @@ -23,7 +24,9 @@ from backend.exceptions import AppBaseException from backend.ticket.constants import TicketType from backend.ticket.models import ClusterOperateRecord, Ticket -from backend.utils.time import find_nearby_time, timezone2timestamp +from backend.utils.time import find_nearby_time + +logger = logging.getLogger("root") class MongoDBRestoreHandler(object): @@ -38,7 +41,8 @@ def _get_log_from_bklog(collector: str, start_time: datetime, end_time: datetime def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str, time_key: str, flag: int): """查询距离rollback_time最近的备份记录""" - end_time = rollback_time + """ end_time 要获得rollback_time后的一个incr文件,这里多查一天,就比较稳了""" + end_time = rollback_time + timedelta(days=1) start_time = end_time - timedelta(days=BACKUP_LOG_RANGE_DAYS) backup_logs = self._get_log_from_bklog( @@ -48,35 +52,41 @@ def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str query_string=query_string, ) if not backup_logs: - raise AppBaseException(_("距离回档时间点7天内没有备份日志").format(rollback_time)) + raise AppBaseException(_("距离回档时间点7天内没有备份日志 {} {}").format(query_string, rollback_time)) # 获取距离回档时间最近的全备日志 backup_logs.sort(key=lambda x: x[time_key]) time_keys = [log[time_key] for log in backup_logs] try: - latest_backup_log_index = find_nearby_time(time_keys, timezone2timestamp(rollback_time), flag) + latest_backup_log_index = find_nearby_time(time_keys, rollback_time, flag) except IndexError: - raise AppBaseException(_("无法找到时间点{}附近的全备日志记录").format(rollback_time)) + raise AppBaseException(_("无法找到时间点{}附近的全备日志记录 query_string:{} ").format(rollback_time, query_string)) return backup_logs, latest_backup_log_index - def query_latest_backup_log(self, rollback_time: datetime) -> Dict[str, Any]: + def query_latest_backup_log(self, rollback_time: datetime, set_name: str = None) -> Dict[str, Any]: """ 查询距离rollback_time最近的全备-增量备份文件 - @param rollback_time: 回档时间 + @param rollback_time: 回档时query_ticket_backup_log间 + @param set_name: 指定SetName. cluster_type为ReplicaSet时,只有一个set_name, 可以为空. """ # 获取距离回档时间最近的全备日志 - query_string = f"cluster: {self.cluster.id} AND pitr_file_type: {PitrFillType.FULL}" + query_string = f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.FULL}" + if set_name is not None: + query_string += f" AND set_name: {set_name}" full_backup_logs, full_latest_index = self._query_latest_log_and_index( rollback_time, query_string, time_key="pitr_last_pos", flag=1 ) latest_full_backup_log = full_backup_logs[full_latest_index] - + logger.info("latest_full_backup_log {}".format(latest_full_backup_log)) # 找到与全备日志pitr_fullname相同的增量备份日志 pitr_fullname = latest_full_backup_log["pitr_fullname"] query_string = ( - f"cluster: {self.cluster.id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}" + f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}" ) + if set_name is not None: + query_string += f" AND set_name: {set_name}" + incr_backup_logs, incr_latest_index = self._query_latest_log_and_index( rollback_time, query_string, time_key="pitr_last_pos", flag=0 ) @@ -116,7 +126,7 @@ def _query_shard_ticket_backup_log(cls, cluster_id, start_time, end_time): collector="mongo_backup_result", start_time=start_time, end_time=end_time, - query_string=f"cluster_id: {cluster_id} AND releate_bill_id: /[0-9]*/", + query_string=f"cluster_id: {cluster_id} AND related_bill_id: /[0-9]*/", ) if not backup_logs: raise AppBaseException(_("{}-{}内没有通过单据备份的日志").format(start_time, end_time)) @@ -132,7 +142,7 @@ def _query_replicaset_ticket_backup_log(cls, cluster_ids, start_time, end_time): collector="mongo_backup_result", start_time=start_time, end_time=end_time, - query_string=f"cluster_type: {ClusterType.MongoReplicaSet} AND releate_bill_id: /[0-9]*/", + query_string=f"cluster_type: {ClusterType.MongoReplicaSet} AND related_bill_id: /[0-9]*/", ) if not backup_logs: raise AppBaseException(_("{}-{}内没有通过单据备份的日志").format(start_time, end_time)) diff --git a/dbm-ui/backend/env/__init__.py b/dbm-ui/backend/env/__init__.py index 6bec572265..5b2e1ea485 100644 --- a/dbm-ui/backend/env/__init__.py +++ b/dbm-ui/backend/env/__init__.py @@ -42,7 +42,6 @@ CMDB_NO_MONITOR_STATUS = get_type_env(key="CMDB_NO_MONITOR_STATUS", _type=str, default="运营中[无告警]") CMDB_NEED_MONITOR_STATUS = get_type_env(key="CMDB_NEED_MONITOR_STATUS", _type=str, default="运营中[需告警]") - # 蓝鲸全业务业务ID JOB_BLUEKING_BIZ_ID = get_type_env(key="JOB_BLUEKING_BIZ_ID", _type=int, default=9991001) @@ -87,10 +86,9 @@ # SaaS访问地址,用于用户访问/第三方应用跳转/Iframe/Grafana 等场景 BK_SAAS_HOST = get_type_env(key="BK_SAAS_HOST", _type=str, default="http://bk-dbm") # BK_SAAS_CALLBACK_URL 用于 接口回调/权限中心访问 等场景 -BK_SAAS_CALLBACK_URL = ( - # 通常因证书问题,这里需要使用 http - get_type_env(key="BK_SAAS_CALLBACK_URL", _type=str, default="") - or BK_SAAS_HOST.replace("https", "http") +# 通常因证书问题,这里需要使用 http +BK_SAAS_CALLBACK_URL = get_type_env(key="BK_SAAS_CALLBACK_URL", _type=str, default="") or BK_SAAS_HOST.replace( + "https", "http" ) # 其他系统访问地址 @@ -125,7 +123,6 @@ ENABLE_CLEAN_EXPIRED_FLOW_INSTANCE = get_type_env(key="ENABLE_CLEAN_EXPIRED_FLOW_INSTANCE", _type=bool, default=False) BAMBOO_TASK_VALIDITY_DAY = get_type_env(key="BAMBOO_TASK_VALIDITY_DAY", _type=int, default=360) - # 是否在部署 MySQL 的时候安装 PERL YUM_INSTALL_PERL = get_type_env(key="YUM_INSTALL_PERL", _type=bool, default=False) @@ -179,3 +176,5 @@ # window ssh服务远程端口 WINDOW_SSH_PORT = get_type_env(key="WINDOW_SSH_PORT", _type=int, default=22) +# 本地测试人员优先使用的版本 +REPO_VERSION_FOR_DEV = get_type_env(key="REPO_VERSION_FOR_DEV", _type=str, default="") diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index 4de44383fb..1152d0ad49 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -489,12 +489,13 @@ class MongoDBActuatorActionEnum(str, StructuredEnum): Backup = EnumField("mongodb_backup", _("mongodb_backup")) RemoveNs = EnumField("mongodb_remove_ns", _("mongodb_remove_ns")) Restore = EnumField("mongodb_restore", _("mongodb_restore")) - PitRestore = EnumField("mongodb_pit_restore", _("mongodb_pit_restore")) + PitRestore = EnumField("mongodb_pitr_restore", _("mongodb_pitr_restore")) MongoRestart = EnumField("mongo_restart", _("mongo_restart")) MongoDReplace = EnumField("mongod_replace", _("mongod_replace")) MongoDeInstall = EnumField("mongo_deinstall", _("mongo_deinstall")) InstallDBMon = EnumField("install_dbmon", _("install_dbmon")) MongoStart = EnumField("mongo_start", _("mongo_start")) + MongoHello = EnumField("mongodb_hello", _("mongodb_hello")) class EsActuatorActionEnum(str, StructuredEnum): @@ -737,6 +738,7 @@ class ConfigDefaultEnum(list, StructuredEnum): class DirEnum(str, StructuredEnum): GSE_DIR = EnumField("/usr/local/gse_bkte", _("gcs 安装路径")) REDIS_KEY_LIFE_DIR = EnumField("/data/dbbak/keylifecycle", _("key生命周期路径")) + MONGO_RECOVER_DIR = EnumField("/data/dbbak/recover_mg", _("mongo恢复路径")) class TruncateDataTypeEnum(str, StructuredEnum): @@ -1166,6 +1168,8 @@ class MongoDBClusterRole(str, StructuredEnum): ConfigSvr = EnumField("configsvr", _("configsvr")) ShardSvr = EnumField("shardsvr", _("shardsvr")) + Mongos = EnumField("mongos", _("mongos")) + Replicaset = EnumField("replicaset", _("replicaset")) class MongoDBTotalCache(float, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py b/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py index 26b8624bae..78d91ea0d7 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py @@ -28,8 +28,13 @@ def __init__(self, db_type: str = DBType.MySQL): """ @param db_type: db类型,默认是MySQL,如果是Redis这actuator包不一样 """ + # repo_version 如果REPO_VERSION_FOR_DEV有值,则使用REPO_VERSION_FOR_DEV,否则使用最新版本 + # 正式环境: REPO_VERSION_FOR_DEV为空 个人测试环境中,REPO_VERSION_FOR_DEV 按需配置 + dev_env = str(env.REPO_VERSION_FOR_DEV) + repo_version = dev_env if dev_env != "" else MediumEnum.Latest + self.actuator_pkg = Package.get_latest_package( - version=MediumEnum.Latest, pkg_type=MediumEnum.DBActuator, db_type=db_type + version=repo_version, pkg_type=MediumEnum.DBActuator, db_type=db_type ) def get_db_actuator_package(self): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/base_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/base_flow.py index 32e4e0d146..ffba97af3d 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/base_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/base_flow.py @@ -46,3 +46,9 @@ def check_cluster_valid(cls, cluster: MongoDBCluster, payload): cluster.bk_biz_id, payload["bk_biz_id"], type(cluster.bk_biz_id), type(payload["bk_biz_id"]) ) ) + + @staticmethod + def check_cluster_id_list(cluster_id_list): + cluster_id_list_set = set(cluster_id_list) + if len(cluster_id_list_set) != len(cluster_id_list): + raise Exception("duplicate cluster_id") diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_backup.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_backup.py index 8827cdb653..484fe7896f 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_backup.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_backup.py @@ -20,8 +20,8 @@ from backend.flow.engine.bamboo.scene.mongodb.base_flow import MongoBaseFlow from backend.flow.engine.bamboo.scene.mongodb.sub_task.backup import BackupSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia -from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs from backend.flow.utils.mongodb.mongodb_repo import MongoDBNsFilter, MongoRepository +from backend.flow.utils.mongodb.mongodb_util import MongoUtil logger = logging.getLogger("flow") @@ -67,12 +67,9 @@ def start(self): """ logger.debug("MongoBackupFlow start, payload", self.payload) # actuator_workdir 提前创建好的,在部署的时候就创建好了. - actuator_workdir = ActKwargs().get_mongodb_os_conf()["file_path"] + actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package() - # 创建流程实例 - pipeline = Builder(root_id=self.root_id, data=self.payload) - # 解析输入 确定每个输入的域名实例都存在. # 1. 解析每个集群Id的节点列表 # 2. 备份一般在某个Secondary且非Backup节点上执行 @@ -85,8 +82,12 @@ def start(self): bk_host_list = [] cluster_id_list = [row["cluster_id"] for row in self.payload["infos"]] + self.check_cluster_id_list(cluster_id_list) clusters = MongoRepository.fetch_many_cluster_dict(id__in=cluster_id_list) + # 创建流程实例 + pipeline = Builder(root_id=self.root_id, data=self.payload) + for row in self.payload["infos"]: try: cluster_id = row["cluster_id"] @@ -111,7 +112,7 @@ def start(self): raise Exception("sub_bk_host_list is None") bk_host_list.extend(sub_bk_host_list) - sub_pipelines.append(sub_pl.build_sub_process(_("MongoDB-备份-{}").format(cluster.name))) + sub_pipelines.append(sub_pl.build_sub_process(_("Backup:{}").format(cluster.immute_domain))) # 介质下发 bk_host_list 在SendMedia.act会去重. pipeline.add_act( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install.py index cca22dba05..2a32a17696 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install.py @@ -141,8 +141,9 @@ def multi_replicaset_install_flow(self): kwargs=kwargs, ) - # # 安装dbmon - # self.install_dbmon(data=self.data, pipeline=pipeline) + # 安装dbmon + self.install_dbmon(data=self.data, pipeline=pipeline) + # 运行流程 pipeline.run_pipeline() @@ -224,7 +225,7 @@ def cluster_install_flow(self): ) # 安装dbmon - # self.install_dbmon(data=self.data, pipeline=pipeline) + self.install_dbmon(data=self.data, pipeline=pipeline) # 运行流程 pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py index 101a7a536f..43c3cc545e 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py @@ -27,15 +27,22 @@ ) from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs -from backend.flow.utils.mongodb.mongodb_repo import MongoNodeWithLabel +from backend.flow.utils.mongodb.mongodb_repo import MongoNodeWithLabel, MongoRepository +from backend.flow.utils.mongodb.mongodb_util import MongoUtil logger = logging.getLogger("flow") def get_pkg_info(): + # repo_version 如果REPO_VERSION_FOR_DEV有值,则使用REPO_VERSION_FOR_DEV,否则使用最新版本 + # 正式环境中,REPO_VERSION_FOR_DEV为空 + # 个人测试环境中,REPO_VERSION_FOR_DEV 按需配置 + repo_version = env.REPO_VERSION_FOR_DEV if env.REPO_VERSION_FOR_DEV else MediumEnum.Latest + actuator_pkg = Package.get_latest_package( - version=MediumEnum.Latest, pkg_type=MediumEnum.DBActuator, db_type=DBType.MongoDB + version=repo_version, pkg_type=MediumEnum.DBActuator, db_type=DBType.MongoDB ) + dbtools_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type="dbtools", db_type=DBType.MongoDB) toolkit_pkg = Package.get_latest_package( version=MediumEnum.Latest, pkg_type="mongo-toolkit", db_type=DBType.MongoDB @@ -54,7 +61,7 @@ def add_install_dbmon(flow, flow_data, pipeline, iplist, bk_cloud_id, allow_empt allow_empty_instance 上架流程中,允许ip没有实例. allow_empty_instance = True """ - actuator_workdir = flow.get_kwargs.file_path + actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] pkg_info = get_pkg_info() file_list = [ "{}/{}/{}".format(env.BKREPO_PROJECT, env.BKREPO_BUCKET, pkg_info.get("actuator_pkg").path), @@ -121,19 +128,15 @@ def add_install_dbmon(flow, flow_data, pipeline, iplist, bk_cloud_id, allow_empt pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) -class MongoInstallDBMon(MongoBaseFlow): +class MongoInstallDBMonFlow(MongoBaseFlow): class Serializer(serializers.Serializer): - class DataRow(serializers.Serializer): - ip = serializers.CharField() - object_type = serializers.CharField() - uid = serializers.CharField() created_by = serializers.CharField() bk_biz_id = serializers.IntegerField() ticket_type = serializers.CharField() action = serializers.CharField() bk_cloud_id = serializers.IntegerField() - infos = DataRow(many=True) + infos = serializers.ListField(child=serializers.CharField()) # ip or cluster_id """MongoInstallDBMon flow 分析 payload,检查输入,生成Flow """ @@ -162,8 +165,38 @@ def start(self): logger.debug("MongoInstallDBMon start, payload", self.payload) # 创建流程实例 pipeline = Builder(root_id=self.root_id, data=self.payload) - add_install_dbmon( - self, self.payload, pipeline, [x["ip"] for x in self.payload["infos"]], self.payload["bk_cloud_id"] - ) + + # parse iplist + iplist = self.get_iplist(self.payload["infos"], bk_cloud_id=self.payload["bk_cloud_id"]) + + add_install_dbmon(self, self.payload, pipeline, iplist, self.payload["bk_cloud_id"]) # 运行流程 pipeline.run_pipeline() + + @staticmethod + def get_iplist(infos: list, bk_cloud_id: int) -> list[str]: + iplist = [] + cluster_id_list = [] + cluster_domain_list = [] + for v in infos: + if v.isdigit(): + cluster_id_list.append(int(v)) + elif v.endswith(".db"): + cluster_domain_list.append(v) + else: + iplist.append(v) + + if cluster_domain_list: + tmp_cluster_id_list = MongoRepository.get_cluster_id_by_domain(cluster_domain_list) + cluster_id_list.extend(tmp_cluster_id_list) + + if cluster_id_list: + cluster_id_list = list(set(cluster_id_list)) # unique + clusters = MongoRepository.fetch_many_cluster(withDomain=False, id__in=cluster_id_list) + for cluster in clusters: + if cluster.bk_cloud_id == bk_cloud_id: + iplist.extend(cluster.get_iplist()) + else: + raise Exception("bk_cloud_id not match {} vs {}".format(bk_cloud_id, cluster.bk_cloud_id)) + + return list(set(iplist)) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py new file mode 100644 index 0000000000..002784f287 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py @@ -0,0 +1,389 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ +from rest_framework import serializers + +from backend.configuration.constants import DBType +from backend.flow.consts import DirEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.mongodb.base_flow import MongoBaseFlow +from backend.flow.engine.bamboo.scene.mongodb.sub_task.download_subtask import DownloadSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.exec_shell_script import ExecShellScript +from backend.flow.engine.bamboo.scene.mongodb.sub_task.fetch_backup_record_subtask import FetchBackupRecordSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.hello_sub import HelloSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.pitr_restore_sub import PitrRestoreSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia +from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 +from backend.flow.utils.mongodb.mongodb_repo import MongoRepository, MongoDBCluster, MongoNode +from backend.flow.utils.mongodb.mongodb_script_template import prepare_recover_dir_script +from backend.flow.utils.mongodb.mongodb_util import MongoUtil + +logger = logging.getLogger("flow") + + +class BsTask: + """备份系统Task,前端传来的数据""" + + class Serializer(serializers.Serializer): + task_id = serializers.CharField() + file_name = serializers.CharField() + + task_id: str = "" + file_name: str = "" + + +class MongoPitrRestoreFlow(MongoBaseFlow): + class Serializer(serializers.Serializer): + class DataRow(serializers.Serializer): + task_ids = BsTask.Serializer(many=True, required=False) + src_cluster_id = serializers.IntegerField() + dst_cluster_id = serializers.IntegerField() + dst_cluster_type = serializers.CharField() + dst_time = serializers.CharField() + apply_oplog = serializers.BooleanField() + + uid = serializers.CharField() + created_by = serializers.CharField() + bk_biz_id = serializers.IntegerField() + ticket_type = serializers.CharField() + infos = DataRow(many=True) + + """MongoPitrRestoreFlow + 分析 payload,检查输入,生成Flow """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + 传入参数 + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + + super().__init__(root_id, data) + self.check_payload() + + def check_payload(self): + print("payload", self.payload) + s = self.Serializer(data=self.payload) + if not s.is_valid(): + raise Exception("payload is invalid {}".format(s.errors)) + + def start(self): + """ + MongoPitrRestoreFlow 流程 + """ + logger.debug("MongoPitrRestoreFlow start, payload", self.payload) + # actuator_workdir 提前创建好的,在部署的时候就创建好了. + actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] + file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package() + + # 解析输入 确定每个输入的域名实例都存在. + # 1. 部署临时集群(目前省略) + # 2. 获得每个目标集群的信息 + # 3-1. 预处理. 准备数据文件目录 mkdir -p $MONGO_RECOVER_DIR + # 3-2. 预处理. 获得每个目标集群的备份文件列表,下载备份文件 + # 4. 执行回档任务 + + # 所有涉及的cluster + cluster_id_list = [row["dst_cluster_id"] for row in self.payload["infos"]] + self.check_cluster_id_list(cluster_id_list) + clusters = MongoRepository.fetch_many_cluster_dict(id__in=cluster_id_list) + dest_dir = str(DirEnum.MONGO_RECOVER_DIR.value) + + # dest_dir 必须是 '/data/dbbak' 开头 + if not dest_dir.startswith("/data/dbbak"): + raise Exception("dest_dir must start with /data/dbbak") + + # 确定exec_node + exec_node_list = [] + all_iplist = [] + cloud_id = [] + for row in self.payload["infos"]: + try: + # 检查目标cluster是否存在 + dst_cluster_id = row["dst_cluster_id"] + cluster = clusters[dst_cluster_id] + self.check_cluster_valid(cluster, self.payload) + except Exception as e: + logger.exception("check_cluster_valid fail") + raise Exception("check_cluster_valid fail cluster_id:{} {}".format(row["cluster_id"], e)) + exec_node_list.extend(self.set_exec_node(row, cluster)) + all_iplist.extend(cluster.get_iplist()) + cloud_id.append(cluster.bk_cloud_id) + + cloud_id = list(set(cloud_id)) + if len(cloud_id) != 1: + raise Exception("There are different cloud id") + # 创建流程实例 + pipeline = Builder(root_id=self.root_id, data=self.payload) + cluster_pipes = [] + for row in self.payload["infos"]: + cluster = clusters[row["dst_cluster_id"]] + logger.debug("sub_pipline start row", row) + logger.debug("sub_pipline start cluster", cluster) + cluster_sb = self.process_cluster( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir + ) + cluster_pipes.append(cluster_sb.build_sub_process(_("cluster {}").format(cluster.name))) + + # 1. 统一预处理 + # 2. 统一下发文件 + # 3. 执行cluster_sub + bk_host_list = list(map(lambda x: {"ip": x, "bk_cloud_id": cloud_id[0]}, set(all_iplist))) + # 开始组装流程 + # Step1 执行做准备脚本 执行mkdir -p /data/dbbak/recover_mg + pipeline.add_act( + **ExecShellScript.act( + act_name=_("MongoDB-预处理 {}".format(len(bk_host_list))), + file_list=file_list, + bk_host_list=bk_host_list, + exec_account="root", + script_content=prepare_recover_dir_script(dest_dir), + ) + ) + + # Step2 介质下发 + pipeline.add_act( + **SendMedia.act( + act_name=_("MongoDB-介质下发 {}".format(len(bk_host_list))), + file_list=file_list, + bk_host_list=bk_host_list, + file_target_path=actuator_workdir, + ) + ) + + # 按Cluster执行流程 + pipeline.add_parallel_sub_pipeline(sub_flow_list=cluster_pipes) + + pipeline.run_pipeline() + + @staticmethod + def set_exec_node(row: Dict, cluster: MongoDBCluster) -> list[MongoNode]: + """ + 确定每个shard的exec_node + """ + exec_node_list = [] + row["__exec_node"] = {} + for shard in cluster.get_shards(): + exec_node = shard.get_not_backup_nodes()[0] + row["__exec_node"][shard.set_name] = exec_node + exec_node_list.append(exec_node) + if cluster.is_sharded_cluster(): + shard = cluster.get_config() + exec_node = shard.get_not_backup_nodes()[0] + row["__exec_node"][shard.set_name] = exec_node + exec_node_list.append(exec_node) + return exec_node_list + + def process_cluster(self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str) -> SubBuilder: + """ + cluster pitr_restore_flow + """ + cluster_sb = SubBuilder(root_id=self.root_id, data=self.payload) + shard_pipes = [] + + if cluster.is_sharded_cluster(): + self.check_empty_cluster( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + self.stop_mongos( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + # 准备主节点: + # 让第1个节点成为Primary,remove掉其它成员,start as standalone mode + # self.remove_none_exec_node + # restart exec_node as standalone node + + self.stop_not_exec_node( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + self.restart_as_standalone( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + # 为每个Shard执行回档,包括configsvr + restore_sb = SubBuilder(root_id=self.root_id, data=self.payload) + for shard in cluster.get_shards(with_config=True): + shard_sb = SubBuilder(root_id=self.root_id, data=self.payload) + self.process_shard( + row=row, + cluster=cluster, + shard=shard, + actuator_workdir=actuator_workdir, + dest_dir=dest_dir, + shard_sub=shard_sb, + ) + shard_pipes.append(shard_sb.build_sub_process(_("{} {}").format(shard.set_type, shard.set_name))) + + restore_sb.add_parallel_sub_pipeline(sub_flow_list=shard_pipes) + cluster_sb.add_sub_pipeline(sub_flow=restore_sb.build_sub_process("restore_by_shard")) + # restore_sb end + + if cluster.is_sharded_cluster(): + # if sharded_cluster + # 处理各个分片和configsvr的关系. + # start as clusterRole: shardsvr + self.rebuild_cluster( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + # todo restart as auth mode && and re add secondary + + return cluster_sb + + def check_empty_cluster( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + exec_node = cluster.get_mongos()[0] + HelloSubTask.process_node( + root_id=self.root_id, + ticket_data=self.payload, + sub_ticket_data=row, + sub_pipeline=cluster_sb, + exec_node=exec_node, + file_path=actuator_workdir, + act_name=_("空闲检查"), + ) + return + + def process_shard(self, row: Dict, cluster, shard, actuator_workdir: str, dest_dir: str, shard_sub: SubBuilder): + """ + pitr_restore_flow one shard + """ + # FetchBackupRecordSubTask 根据 sub_ticket_data中的src_cluster_id, dst_time 获得备份文件列表. + FetchBackupRecordSubTask.process_shard( + root_id=self.root_id, + ticket_data=self.payload, + sub_ticket_data=row, + cluster=cluster, + shard=shard, + ) + exec_node = row["__exec_node"][shard.set_name] + + logger.debug("sub_ticket_data {}".format(row)) + # process_cluster 会根据src_cluster_id, dst_time 获得备份文件列表. + DownloadSubTask.process_shard( + root_id=self.root_id, + ticket_data=self.payload, + sub_ticket_data=row, + shard=shard, + file_path=actuator_workdir, + dest_dir=dest_dir, + dest_node=exec_node, + sub_pipeline=shard_sub, + ) + + PitrRestoreSubTask.process_shard( + root_id=self.root_id, + ticket_data=self.payload, + sub_ticket_data=row, + shard=shard, + file_path=actuator_workdir, + dest_dir=dest_dir, + exec_node=exec_node, + sub_pipeline=shard_sub, + ) + + return + + def stop_mongos( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for mongos in cluster.get_mongos(): + acts_list.append( + { + "act_name": _("stop_mongos {}:{}".format(mongos.ip, mongos.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), + } + ) + + # 可能会存在mongos列表为空的情况吗? + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("stop_mongos")) + + def stop_not_exec_node( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for mongos in cluster.get_mongos(): + acts_list.append( + { + "act_name": _("stop {}:{}".format(mongos.ip, mongos.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), + } + ) + + # 可能会存在mongos列表为空的情况吗? + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("stop_not_exec_node")) + + def restart_as_standalone( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for mongos in cluster.get_mongos(): + acts_list.append( + { + "act_name": _("restart {}:{}".format(mongos.ip, mongos.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), + } + ) + + # 可能会存在mongos列表为空的情况吗? + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("restart_as_standalone")) + + def rebuild_cluster( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for mongos in cluster.get_mongos(): + acts_list.append( + { + "act_name": _("rebuild {}:{}".format(mongos.ip, mongos.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), + } + ) + + # 可能会存在mongos列表为空的情况吗? + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("rebuild_cluster")) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_remove_ns.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_remove_ns.py index 4b2df951c9..33cc890d79 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_remove_ns.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_remove_ns.py @@ -20,8 +20,8 @@ from backend.flow.engine.bamboo.scene.mongodb.base_flow import MongoBaseFlow from backend.flow.engine.bamboo.scene.mongodb.sub_task.remove_ns import RemoveNsSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia -from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs from backend.flow.utils.mongodb.mongodb_repo import MongoDBNsFilter, MongoRepository +from backend.flow.utils.mongodb.mongodb_util import MongoUtil logger = logging.getLogger("flow") @@ -66,7 +66,8 @@ def start(self): # 创建流程实例 pipeline = Builder(root_id=self.root_id, data=self.payload) # actuator_workdir 提前创建好的,在部署的时候就创建好了. - actuator_workdir = ActKwargs().get_mongodb_os_conf()["file_path"] + actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] + file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package() sub_pipelines = [] bk_host_list = [] @@ -80,7 +81,7 @@ def start(self): self.check_cluster_valid(cluster, self.payload) except Exception as e: logger.exception("check_cluster_valid fail") - raise Exception("check_cluster_valid fail cluster_id:{} {}".format(row["cluster_id"], e)) + raise Exception("check_cluster_valid fail cluster_id:{} {}".format(cluster_id, e)) sub_pl, sub_bk_host_list = RemoveNsSubTask.process_cluster( root_id=self.root_id, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_restore.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_restore.py index 763256f7f3..8524969553 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_restore.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_restore.py @@ -16,16 +16,17 @@ from rest_framework import serializers from backend.configuration.constants import DBType -from backend.flow.engine.bamboo.scene.common.builder import Builder +from backend.flow.consts import DirEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mongodb.base_flow import MongoBaseFlow from backend.flow.engine.bamboo.scene.mongodb.sub_task.download_subtask import DownloadSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.exec_shell_script import ExecShellScript from backend.flow.engine.bamboo.scene.mongodb.sub_task.restore_sub import RestoreSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia -from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs from backend.flow.utils.mongodb.mongodb_repo import MongoDBNsFilter, MongoRepository from backend.flow.utils.mongodb.mongodb_script_template import prepare_recover_dir_script +from backend.flow.utils.mongodb.mongodb_util import MongoUtil logger = logging.getLogger("flow") @@ -83,30 +84,32 @@ def start(self): """ logger.debug("MongoDBRestoreFlow start, payload", self.payload) # actuator_workdir 提前创建好的,在部署的时候就创建好了. - actuator_workdir = ActKwargs().get_mongodb_os_conf()["file_path"] + actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package() # 创建流程实例 pipeline = Builder(root_id=self.root_id, data=self.payload) + root_sub = [] # 解析输入 确定每个输入的域名实例都存在. # 1. 部署临时集群(目前省略) # 2. 获得每个目标集群的信息 - # 3-1. 准备数据文件目录 mkdir -p /data/dbbak/recover_mg + # 3-1. 准备数据文件目录 mkdir -p $MONGO_RECOVER_DIR # 3-2. 获得每个目标集群的备份文件列表,下载备份文件 (todo: 如果存在的情况下跳过) # 4. 执行回档任务 # # ### 获取机器磁盘备份目录信息 ########################################################## - step3_sub = [] - step4_sub = [] - # bk_host {ip:"1.1.1.1", bk_cloud_id: "0"} - bk_host_list = [] - # 所有涉及的cluster cluster_id_list = [row["dst_cluster_id"] for row in self.payload["infos"]] + self.check_cluster_id_list(cluster_id_list) clusters = MongoRepository.fetch_many_cluster_dict(id__in=cluster_id_list) + dest_dir = DirEnum.MONGO_RECOVER_DIR.value + + # dest_dir 必须是 '/data/dbbak' 开头 + if not dest_dir.startswith("/data/dbbak"): + raise Exception("dest_dir must start with /data/dbbak") - # 生成子流程 + # by Cluster for row in self.payload["infos"]: try: dst_cluster_id = row["dst_cluster_id"] @@ -118,12 +121,30 @@ def start(self): logger.debug("sub_pipline start row", row) logger.debug("sub_pipline start cluster", cluster) - sub_pl, sub_bk_host_list = DownloadSubTask.process_cluster( + # bk_host {ip:"1.1.1.1", bk_cloud_id: "0"} + step3_sub, step4_sub, bk_host_list = [], [], [] + sb = SubBuilder(root_id=self.root_id, data=row) + + # 保存一些初始内容,在不同的步骤中共享 + row["_tmp_data"] = { + "cluster": cluster, + } + + # if row["import_to"] == "mongos": + + # 分离备份文件. 确定回档机器-文件列表-回档目标 + rs = cluster.get_shards()[0] + exec_node = rs.get_not_backup_nodes()[0] + + # 在备份系统中提单,将所需要的文件复制过去 + sub_pl = DownloadSubTask.process_shard( root_id=self.root_id, ticket_data=self.payload, sub_ticket_data=row, - cluster=cluster, + shard=rs, file_path=actuator_workdir, + dest_dir=dest_dir, + dest_node=exec_node, ) step3_sub.append(sub_pl.build_sub_process(_("下载备份文件-{}").format(cluster.name))) @@ -133,41 +154,40 @@ def start(self): sub_ticket_data=row, cluster=cluster, file_path=actuator_workdir, + dest_dir=dest_dir, + exec_node=exec_node, ) - step4_sub.append(sub_pl4.build_sub_process(_("执行回档命令-{}").format(cluster.name))) - - if sub_pl is None: - raise Exception("sub_pl is None") - if sub_bk_host_list is None or len(sub_bk_host_list) == 0: - raise Exception("sub_bk_host_list is None") - - bk_host_list.extend(sub_bk_host_list) - - # 开始组装流程 从Step1 开始 - # Step1 执行做准备脚本 执行mkdir -p /data/dbbak/recover_mg - pipeline.add_act( - **ExecShellScript.act( - act_name=_("MongoDB-预处理"), - file_list=file_list, - bk_host_list=bk_host_list, - exec_account="root", - script_content=prepare_recover_dir_script(), + step4_sub.append(sub_pl4.build_sub_process(_("执行导入命令-{}").format(cluster.name))) + + # 开始组装流程 从Step1 开始 + # Step1 执行做准备脚本 执行mkdir -p /data/dbbak/recover_mg + sb.add_act( + **ExecShellScript.act( + act_name=_("MongoDB-预处理"), + file_list=file_list, + bk_host_list=bk_host_list, + exec_account="root", + script_content=prepare_recover_dir_script(dest_dir), + ) ) - ) - - # Step2 介质下发 bk_host_list 在SendMedia.act会去重. - pipeline.add_act( - **SendMedia.act( - act_name=_("MongoDB-介质下发"), - file_list=file_list, - bk_host_list=bk_host_list, - file_target_path=actuator_workdir, + + # Step2 介质下发 bk_host_list 在SendMedia.act会去重. + sb.add_act( + **SendMedia.act( + act_name=_("MongoDB-介质下发"), + file_list=file_list, + bk_host_list=bk_host_list, + file_target_path=actuator_workdir, + ) ) - ) - # Step3 并行执行备份 - pipeline.add_parallel_sub_pipeline(sub_flow_list=step3_sub) - # Step3 并行执行备份 - pipeline.add_parallel_sub_pipeline(sub_flow_list=step4_sub) + # Step3 下载备份文件 + sb.add_parallel_sub_pipeline(sub_flow_list=step3_sub) + # Step4 执行回档 + sb.add_parallel_sub_pipeline(sub_flow_list=step4_sub) + + root_sub.append(sb.build_sub_process(_("{}").format(cluster.name))) + # + pipeline.add_parallel_sub_pipeline(sub_flow_list=root_sub) # 运行流程 pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/backup.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/backup.py index 5e82634862..a37f243018 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/backup.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/backup.py @@ -13,13 +13,14 @@ from django.utils.translation import ugettext as _ -from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.consts import MongoDBActuatorActionEnum, MongoDBManagerUser from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 from backend.flow.utils.mongodb import mongodb_password -from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs, CommonContext +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, MongoDBNsFilter, MongoNodeWithLabel, ReplicaSet +from backend.flow.utils.mongodb.mongodb_util import MongoUtil logger = logging.getLogger("flow") @@ -48,7 +49,7 @@ def make_kwargs( cls.parse_ns_filter(sub_payload) node = nodes[0] - dba_user = "dba" + dba_user = MongoDBManagerUser.DbaUser.value dba_pwd = mongodb_password.MongoDBPassword().get_password_from_db( node.ip, int(node.port), node.bk_cloud_id, dba_user )["password"] @@ -59,7 +60,7 @@ def make_kwargs( if is_partial and oplog: raise Exception(_("oplog为True时, 不支持partial备份")) bk_dbm_instance = MongoNodeWithLabel.from_node(node, rs, cluster) - sudo_account = ActKwargs().get_mongodb_os_conf()["user"] + sudo_account = MongoUtil().get_mongodb_os_conf()["user"] return { "set_trans_data_dataclass": CommonContext.__name__, "get_trans_data_ip_var": None, @@ -107,7 +108,7 @@ def process_cluster( port = kwargs["db_act_template"]["payload"]["port"] acts_list.append( { - "act_name": _("MongoDB-备份-[{}:{}]".format(exec_ip, port)), + "act_name": _("{}:[{}:{}]".format(rs.set_name, exec_ip, port)), "act_component_code": ExecJobComponent2.code, "kwargs": kwargs, } diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/download_subtask.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/download_subtask.py index 7328885156..d9f4491cb0 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/download_subtask.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/download_subtask.py @@ -19,7 +19,7 @@ ) from backend.flow.utils.base.payload_handler import PayloadHandler from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext -from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, ReplicaSet +from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, ReplicaSet, MongoNode # Prepare datafile 准备数据文件 @@ -32,17 +32,20 @@ class DownloadSubTask(BaseSubTask): """ @classmethod - def make_kwargs(cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path: str) -> dict: + def make_kwargs( + cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path, dest_dir: str, node: MongoNode + ) -> dict: print("get_backup_node", sub_payload) - node = rs.get_not_backup_nodes()[0] os_account = PayloadHandler.redis_get_os_account() task_id_list = [m.get("task_id") for m in sub_payload["task_ids"]] + # sub_payload["_tmp_data"]["dest_node"] = node + return { "set_trans_data_dataclass": CommonContext.__name__, "bk_cloud_id": node.bk_cloud_id, "task_ids": task_id_list, "dest_ip": node.ip, - "dest_dir": "/data/dbbak/recover_mg", + "dest_dir": dest_dir, "reason": "mongodb recover setName:{} to {}".format(rs.set_name, node.ip), "login_user": os_account["os_user"], "login_passwd": os_account["os_password"], @@ -55,7 +58,8 @@ def process_cluster( ticket_data: Optional[Dict], sub_ticket_data: Optional[Dict], cluster: MongoDBCluster, - file_path: str, + file_path, + dest_dir: str, ) -> Tuple[SubBuilder, List]: """ cluster can be a ReplicaSet or a ShardedCluster @@ -64,8 +68,10 @@ def process_cluster( # 创建子流程 sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) acts_list = [] + for rs in cluster.get_shards(): - kwargs = cls.make_kwargs(ticket_data, sub_ticket_data, rs, file_path) + node = rs.get_not_backup_nodes()[0] + kwargs = cls.make_kwargs(ticket_data, sub_ticket_data, rs, file_path, dest_dir, node) acts_list.append( { "act_name": _("下载备份文件 {} {}".format(rs.set_name, kwargs["dest_ip"])), @@ -80,3 +86,31 @@ def process_cluster( sub_bk_host_list.append({"ip": v["kwargs"]["dest_ip"], "bk_cloud_id": v["kwargs"]["bk_cloud_id"]}) return sub_pipeline, sub_bk_host_list + + @classmethod + def process_shard( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + shard: ReplicaSet, + file_path, + dest_dir: str, + dest_node: MongoNode, + sub_pipeline: SubBuilder, + ) -> Tuple[SubBuilder]: + """ + for one process_shard + """ + + # 创建子流程 + if sub_pipeline is None: + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + kwargs = cls.make_kwargs(ticket_data, sub_ticket_data, shard, file_path, dest_dir, dest_node) + act = { + "act_name": _("下载备份文件 {}".format(kwargs["dest_ip"])), + "act_component_code": MongoDownloadBackupFileComponent.code, + "kwargs": kwargs, + } + sub_pipeline.add_act(**act) + return sub_pipeline diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/fetch_backup_record_subtask.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/fetch_backup_record_subtask.py new file mode 100644 index 0000000000..4382baee03 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/fetch_backup_record_subtask.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging +from typing import Dict, List, Optional, Tuple + +from backend.db_services.mongodb.restore.handlers import MongoDBRestoreHandler +from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask +from backend.flow.utils.base.payload_handler import PayloadHandler +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext +from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, ReplicaSet +from backend.utils import time + +# FetchBackupFile 获得备份记录 +logger = logging.getLogger("flow") + + +class FetchBackupRecordSubTask(BaseSubTask): + """ + payload: 整体的ticket_data + sub_payload: 这个子任务的ticket_data + rs: + backup_dir: + """ + + @classmethod + def make_kwargs(cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path, dest_dir: str) -> dict: + print("get_backup_node", sub_payload) + node = rs.get_not_backup_nodes()[0] + os_account = PayloadHandler.redis_get_os_account() + task_id_list = [m.get("task_id") for m in sub_payload["task_ids"]] + return { + "set_trans_data_dataclass": CommonContext.__name__, + "bk_cloud_id": node.bk_cloud_id, + "task_ids": task_id_list, + "dest_ip": node.ip, + "dest_dir": dest_dir, + "reason": "mongodb recover setName:{} to {}".format(rs.set_name, node.ip), + "login_user": os_account["os_user"], + "login_passwd": os_account["os_password"], + } + + @classmethod + def process_shard( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + cluster: MongoDBCluster, + shard: ReplicaSet, + ) -> Tuple[SubBuilder, List]: + """ + cluster can be a ReplicaSet or a ShardedCluster + """ + + cluster_id = cluster.cluster_id + shard_name = shard.set_name + ret = cls.fetch_backup_record(cluster_id, shard_name, sub_ticket_data["dst_time"]) + full = ret["full_backup_log"] + backup_record = [ + { + "task_id": full["bs_taskid"], + "file_name": full["file_name"], + "instance": "{}:{}".format(full["ip"], full["port"]), + } + ] + for incr_log in ret["incr_backup_logs"]: + backup_record.append( + { + "task_id": incr_log["bs_taskid"], + "file_name": incr_log["file_name"], + "instance": "{}:{}".format(full["ip"], full["port"]), + } + ) + + sub_ticket_data["task_ids"] = backup_record + + return + + @classmethod + def fetch_backup_record(cls, cluster_id, shard_name, dst_time_str: str): + # fetch_backup_record 目前只能处理replicaset的。 todo : 兼容sharded cluster + dst_time = time.str2datetime(dst_time_str) + rec = MongoDBRestoreHandler(cluster_id).query_latest_backup_log(dst_time, shard_name) + # return {"full_backup_log": latest_full_backup_log, "incr_backup_logs": incr_backup_logs} + return rec diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/hello_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/hello_sub.py new file mode 100644 index 0000000000..89511485fc --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/hello_sub.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from typing import Dict, List, Optional, Tuple + +from django.utils.translation import ugettext as _ + +from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask +from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext +from backend.flow.utils.mongodb.mongodb_repo import ReplicaSet, MongoNode +from backend.flow.utils.mongodb.mongodb_util import MongoUtil + + +# HelloSubTask 测试用,总是返回成功. +class HelloSubTask(BaseSubTask): + """ + payload: 整体的ticket_data + sub_payload: 这个子任务的ticket_data + rs: + """ + + @classmethod + def make_kwargs(cls, file_path, exec_node: MongoNode) -> dict: + dba_user, dba_pwd = MongoUtil.get_dba_user_password(exec_node.ip, exec_node.port, exec_node.bk_cloud_id) + return { + "set_trans_data_dataclass": CommonContext.__name__, + "get_trans_data_ip_var": None, + "bk_cloud_id": exec_node.bk_cloud_id, + "exec_ip": exec_node.ip, + "db_act_template": { + "action": MongoDBActuatorActionEnum.MongoHello, + "file_path": file_path, + "exec_account": "root", + "sudo_account": "mysql", + "payload": { + "ip": exec_node.ip, + "port": int(exec_node.port), + "adminUsername": dba_user, + "adminPassword": dba_pwd, + }, + }, + } + + @classmethod + def process_node( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + file_path, + exec_node: MongoNode, + sub_pipeline: SubBuilder, + act_name: str, + ) -> SubBuilder: + """ + cluster can be a ReplicaSet or a ShardedCluster + """ + + # 创建子流程 + if sub_pipeline is None: + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + + kwargs = cls.make_kwargs(file_path, exec_node) + act = { + "act_name": _("{} {}:{}".format(act_name, exec_node.ip, exec_node.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + sub_pipeline.add_act(**act) + return sub_pipeline + + @classmethod + def process_shard( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + shard: ReplicaSet, + file_path, + dest_dir: str, + exec_node: MongoNode, + sub_pipeline: SubBuilder, + ) -> Tuple[SubBuilder, List]: + """ + cluster can be a ReplicaSet or a ShardedCluster + """ + + # 创建子流程 + if sub_pipeline is None: + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + + kwargs = cls.make_kwargs(sub_ticket_data, shard, file_path, dest_dir, exec_node) + act = { + "act_name": _("Hello {}:{}".format(exec_node.ip, exec_node.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + sub_pipeline.add_act(**act) + return sub_pipeline diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py index c394416eef..e4cda2d8e8 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py @@ -100,14 +100,13 @@ def process_server( # 创建子流程 sub_pipeline = SubBuilder(root_id=root_id, data=flow_data) kwargs = cls.make_kwargs(ip, bk_cloud_id, nodes, file_path, pkg_info, bk_monitor_beat_config) - acts_list = [ - { - "act_name": _("node-{}".format(kwargs["exec_ip"])), - "act_component_code": ExecJobComponent2.code, - "kwargs": kwargs, - } - ] - sub_pipeline.add_parallel_acts(acts_list=acts_list) + act = { + "act_name": _("node-{}".format(kwargs["exec_ip"])), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + + sub_pipeline.add_act(**act) sub_bk_host_list = [{"ip": ip, "bk_cloud_id": bk_cloud_id}] return sub_pipeline, sub_bk_host_list diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_restore_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_restore_sub.py new file mode 100644 index 0000000000..fca394b881 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_restore_sub.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from typing import Dict, List, Optional, Tuple + +from django.utils.translation import ugettext as _ + +from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask +from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext +from backend.flow.utils.mongodb.mongodb_repo import ReplicaSet, MongoNode +from backend.flow.utils.mongodb.mongodb_util import MongoUtil + + +# PitrRestoreSubTask 处理某个Cluster的PitrRestore +class PitrRestoreSubTask(BaseSubTask): + """ + payload: 整体的ticket_data + sub_payload: 这个子任务的ticket_data + rs: + """ + + @classmethod + def make_kwargs(cls, sub_payload: Dict, shard: ReplicaSet, file_path, dest_dir: str, exec_node: MongoNode) -> dict: + # todo find primary node + dba_user, dba_pwd = MongoUtil.get_dba_user_password(exec_node.ip, exec_node.port, exec_node.bk_cloud_id) + return { + "set_trans_data_dataclass": CommonContext.__name__, + "get_trans_data_ip_var": None, + "bk_cloud_id": exec_node.bk_cloud_id, + "exec_ip": exec_node.ip, + "db_act_template": { + "action": MongoDBActuatorActionEnum.PitRestore, + "file_path": file_path, + "exec_account": "root", + "sudo_account": "mysql", + "payload": { + "ip": exec_node.ip, + "port": int(exec_node.port), + "adminUsername": dba_user, + "adminPassword": dba_pwd, + "srcAddr": sub_payload["task_ids"][0]["instance"], + "recoverTimeStr": sub_payload["dst_time"], + "dryRun": False, + "dir": dest_dir, + }, + }, + } + + @classmethod + def process_shard( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + shard: ReplicaSet, + file_path, + dest_dir: str, + exec_node: MongoNode, + sub_pipeline: SubBuilder, + ) -> Tuple[SubBuilder, List]: + """ + cluster can be a ReplicaSet or a ShardedCluster + """ + + # 创建子流程 + if sub_pipeline is None: + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + + kwargs = cls.make_kwargs(sub_ticket_data, shard, file_path, dest_dir, exec_node) + act = { + "act_name": _("执行回档命令 {}:{}".format(exec_node.ip, exec_node.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + sub_pipeline.add_act(**act) + return sub_pipeline diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/remove_ns.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/remove_ns.py index fa9a66124e..308059d696 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/remove_ns.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/remove_ns.py @@ -12,7 +12,7 @@ from django.utils.translation import ugettext as _ -from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.consts import MongoDBActuatorActionEnum, MongoDBManagerUser from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 @@ -40,7 +40,7 @@ def make_kwargs(cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path ns_filter = sub_payload.get("ns_filter") is_partial = MongoDBNsFilter.is_partial(ns_filter) node = nodes[0] - dba_user = "dba" + dba_user = MongoDBManagerUser.DbaUser.value dba_pwd = mongodb_password.MongoDBPassword().get_password_from_db( node.ip, int(node.port), node.bk_cloud_id, dba_user )["password"] diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/restore_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/restore_sub.py index 9147aa53fe..0436145b96 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/restore_sub.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/restore_sub.py @@ -16,9 +16,9 @@ from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 -from backend.flow.utils.mongodb import mongodb_password from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext -from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, MongoDBNsFilter, ReplicaSet +from backend.flow.utils.mongodb.mongodb_repo import MongoDBCluster, MongoDBNsFilter, MongoNode +from backend.flow.utils.mongodb.mongodb_util import MongoUtil # RestoreSubTask 处理某个Cluster的Restore @@ -30,33 +30,26 @@ class RestoreSubTask(BaseSubTask): """ @classmethod - def make_kwargs(cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path: str) -> dict: - print("get_backup_node", sub_payload) - nodes = rs.get_not_backup_nodes() - if len(nodes) == 0: - raise Exception("no backup node. rs:{}".format(rs.set_name)) - + def make_kwargs( + cls, sub_payload: Dict, file_path, dest_dir: str, exec_node: MongoNode, dest_node: MongoNode, dest_type: str + ) -> dict: ns_filter = sub_payload.get("ns_filter") is_partial = MongoDBNsFilter.is_partial(ns_filter) - node = nodes[0] - dba_user = "dba" - dba_pwd = mongodb_password.MongoDBPassword().get_password_from_db( - node.ip, int(node.port), node.bk_cloud_id, dba_user - )["password"] - + dba_user, dba_pwd = MongoUtil.get_dba_user_password(dest_node.ip, dest_node.port, dest_node.bk_cloud_id) return { "set_trans_data_dataclass": CommonContext.__name__, "get_trans_data_ip_var": None, - "bk_cloud_id": node.bk_cloud_id, - "exec_ip": node.ip, + "bk_cloud_id": exec_node.bk_cloud_id, + "exec_ip": exec_node.ip, "db_act_template": { "action": MongoDBActuatorActionEnum.Restore, "file_path": file_path, "exec_account": "root", "sudo_account": "mysql", "payload": { - "ip": node.ip, - "port": int(node.port), + "ip": dest_node.ip, + "port": int(dest_node.port), + "dest_type": dest_type, "adminUsername": dba_user, "adminPassword": dba_pwd, "args": { @@ -64,6 +57,7 @@ def make_kwargs(cls, payload: Dict, sub_payload: Dict, rs: ReplicaSet, file_path "isPartial": is_partial, "oplog": False, "nsFilter": sub_payload["ns_filter"], + "recoverDir": dest_dir, }, }, }, @@ -76,7 +70,9 @@ def process_cluster( ticket_data: Optional[Dict], sub_ticket_data: Optional[Dict], cluster: MongoDBCluster, - file_path: str, + file_path, + dest_dir: str, + exec_node: MongoNode, ) -> Tuple[SubBuilder, List]: """ cluster can be a ReplicaSet or a ShardedCluster @@ -85,15 +81,24 @@ def process_cluster( # 创建子流程 sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) acts_list = [] - for rs in cluster.get_shards(): - kwargs = cls.make_kwargs(ticket_data, sub_ticket_data, rs, file_path) - acts_list.append( - { - "act_name": _("{} {}".format(rs.set_name, kwargs["exec_ip"])), - "act_component_code": ExecJobComponent2.code, - "kwargs": kwargs, - } - ) + + # 导入数据时,集群只使用一个分片,待完善. + rs = cluster.get_shards()[0] + if cluster.is_sharded_cluster(): + dest_node = cluster.get_mongos()[0] + dest_type = "mongos" + else: + dest_node = exec_node + dest_type = "mongod" + + kwargs = cls.make_kwargs(sub_ticket_data, file_path, dest_dir, exec_node, dest_node, dest_type) + acts_list.append( + { + "act_name": _("{} {}".format(rs.set_name, kwargs["exec_ip"])), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + ) sub_pipeline.add_parallel_acts(acts_list=acts_list) sub_bk_host_list = [] diff --git a/dbm-ui/backend/flow/engine/controller/mongodb.py b/dbm-ui/backend/flow/engine/controller/mongodb.py index 00f139b94b..eb50e3fb7c 100644 --- a/dbm-ui/backend/flow/engine/controller/mongodb.py +++ b/dbm-ui/backend/flow/engine/controller/mongodb.py @@ -15,9 +15,10 @@ from backend.flow.engine.bamboo.scene.mongodb.mongodb_exec_script import MongoExecScriptFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_fake_install import MongoFakeInstallFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_install import MongoDBInstallFlow -from backend.flow.engine.bamboo.scene.mongodb.mongodb_install_dbmon import MongoInstallDBMon +from backend.flow.engine.bamboo.scene.mongodb.mongodb_install_dbmon import MongoInstallDBMonFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_instance_restart import MongoRestartInstanceFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_migrate import MongoDBMigrateMetaFlow +from backend.flow.engine.bamboo.scene.mongodb.mongodb_pitr_restore import MongoPitrRestoreFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_remove_ns import MongoRemoveNsFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_replace import MongoReplaceFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_restore import MongoRestoreFlow @@ -25,7 +26,6 @@ from backend.flow.engine.bamboo.scene.mongodb.mongodb_scale_storage import MongoScaleFlow from backend.flow.engine.bamboo.scene.mongodb.mongodb_user import MongoUserFlow from backend.flow.engine.controller.base import BaseController -from backend.ticket.constants import TicketType class MongoDBController(BaseController): @@ -50,21 +50,25 @@ def cluster_create(self): flow.cluster_install_flow() def mongo_backup(self): + MongoBackupFlow(root_id=self.root_id, data=self.ticket_data).start() + + def mongo_restore(self): + # 发起恢复任务 + MongoRestoreFlow(root_id=self.root_id, data=self.ticket_data).start() + + def mongo_pitr_restore(self): + # 发起PITR恢复任务 + MongoPitrRestoreFlow(root_id=self.root_id, data=self.ticket_data).start() + + def install_dbmon(self): + # 部署MongoDB bk-dbmon + MongoInstallDBMonFlow(root_id=self.root_id, data=self.ticket_data).start() + + def mongo_remove_ns(self): """ - 发起任务 + 发起删除库表任务 """ - # Get Ticket Name. 以后再拆到url那边. 临时用法. - ticket_name = self.ticket_data["ticket_type"] - if ticket_name == TicketType.MONGODB_RESTORE: - flow = MongoRestoreFlow(root_id=self.root_id, data=self.ticket_data) - elif ticket_name == TicketType.MONGODB_FULL_BACKUP or ticket_name == TicketType.MONGODB_BACKUP: - flow = MongoBackupFlow(root_id=self.root_id, data=self.ticket_data) - elif ticket_name == TicketType.MONGODB_INSTALL_DBMON: - flow = MongoInstallDBMon(root_id=self.root_id, data=self.ticket_data) - else: - raise Exception("Unknown ticket name: %s" % ticket_name) - - flow.start() + MongoRemoveNsFlow(root_id=self.root_id, data=self.ticket_data).start() def fake_install(self): """ @@ -105,12 +109,6 @@ def instance_restart(self): flow = MongoRestartInstanceFlow(root_id=self.root_id, data=self.ticket_data) flow.multi_instance_restart_flow() - def mongo_remove_ns(self): - """ - 发起删除库表任务 - """ - MongoRemoveNsFlow(root_id=self.root_id, data=self.ticket_data).start() - def machine_replace(self): """ 整机替换 diff --git a/dbm-ui/backend/flow/plugins/components/collections/mongodb/mongo_download_backup_files.py b/dbm-ui/backend/flow/plugins/components/collections/mongodb/mongo_download_backup_files.py index d13d7a3a7a..c47d346c1f 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mongodb/mongo_download_backup_files.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mongodb/mongo_download_backup_files.py @@ -57,7 +57,7 @@ def _execute(self, data, parent_data) -> bool: self.log_debug(params) response = RedisBackupApi.download(params=params) # 这里使用RedisBackupApi,这是通用的. - self.log_warning("RedisBackupApi.download response: {}".format(response)) + self.log_warning("RedisBackupApi.download resp: {}".format(response)) backup_bill_id = response.get("bill_id", -1) if backup_bill_id > 0: diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index 7863b84233..cec16c0499 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -92,6 +92,10 @@ MongoDBReplaceView, MongoDBScaleView, MongoFakeInstallApiView, + MongoInstallDbmonApiView, + MongoPitrRestoreApiView, + MongoRemoveNsApiView, + MongoRestoreApiView, MultiReplicasetInstallApiView, ) from backend.flow.views.mysql_add_slave import AddMysqlSlaveSceneApiView @@ -312,6 +316,10 @@ url(r"^scene/multi_replicaset_create$", MultiReplicasetInstallApiView.as_view()), url(r"^scene/cluster_create$", ClusterInstallApiView.as_view()), url(r"^scene/mongo_backup$", MongoBackupApiView.as_view()), + url(r"^scene/mongo_restore$", MongoRestoreApiView.as_view()), + url(r"^scene/mongo_pitr_restore$", MongoPitrRestoreApiView.as_view()), + url(r"^scene/mongo_remove_ns$", MongoRemoveNsApiView.as_view()), + url(r"^scene/mongo_install_dbmon$", MongoInstallDbmonApiView.as_view()), url(r"^scene/install_rs_fake$", MongoFakeInstallApiView.as_view()), url(r"^scene/multi_cluster_create_user$", MongoDBCreateUserView.as_view()), url(r"^scene/multi_cluster_delete_user$", MongoDBDeleteUserView.as_view()), diff --git a/dbm-ui/backend/flow/utils/mongodb/mongodb_dataclass.py b/dbm-ui/backend/flow/utils/mongodb/mongodb_dataclass.py index dc23784c9a..76bb0c0a2e 100644 --- a/dbm-ui/backend/flow/utils/mongodb/mongodb_dataclass.py +++ b/dbm-ui/backend/flow/utils/mongodb/mongodb_dataclass.py @@ -753,7 +753,7 @@ def get_cluster_info_user(self, cluster_id: int, admin_user: str): """创建/删除用户获取cluster信息""" # 获取集群信息 - cluster_info = MongoRepository().fetch_one_cluster(set_get_domain=False, id=cluster_id) + cluster_info = MongoRepository().fetch_one_cluster(withDomain=False, id=cluster_id) bk_cloud_id = cluster_info.bk_cloud_id self.cluster_type = cluster_info.cluster_type exec_ip: str = None @@ -880,7 +880,7 @@ def get_hosts_deinstall(self): hosts = set() bk_cloud_id: int = None for cluster_id in self.payload["cluster_ids"]: - cluster_info = MongoRepository().fetch_one_cluster(set_get_domain=False, id=cluster_id) + cluster_info = MongoRepository().fetch_one_cluster(withDomain=False, id=cluster_id) if cluster_info.cluster_type == ClusterType.MongoReplicaSet.value: shard = cluster_info.get_shards()[0] bk_cloud_id = shard.members[0].bk_cloud_id @@ -906,7 +906,7 @@ def get_hosts_deinstall(self): def get_cluster_info_deinstall(self, cluster_id: int): """卸载流程获取cluster信息""" - cluster_info = MongoRepository().fetch_one_cluster(set_get_domain=True, id=cluster_id) + cluster_info = MongoRepository().fetch_one_cluster(withDomain=True, id=cluster_id) self.payload["cluster_type"] = cluster_info.cluster_type self.payload["set_id"] = cluster_info.name self.payload["cluster_name"] = cluster_info.name @@ -1207,7 +1207,7 @@ def get_mongos_host_replace(self): def get_config_set_name_replace(cluster_id) -> str: """获取分片集群的configDB的set_id""" - return MongoRepository().fetch_one_cluster(set_get_domain=False, id=cluster_id).get_config().set_name + return MongoRepository().fetch_one_cluster(withDomain=False, id=cluster_id).get_config().set_name def calc_param_replace(self, info: dict, instance_num: int): """ "计算参数""" @@ -1786,7 +1786,7 @@ def get_hosts_enable_disable(self): hosts = set() bk_cloud_id: int = None for cluster_id in self.payload["cluster_ids"]: - cluster_info = MongoRepository().fetch_one_cluster(set_get_domain=False, id=cluster_id) + cluster_info = MongoRepository().fetch_one_cluster(withDomain=False, id=cluster_id) if cluster_info.cluster_type == ClusterType.MongoReplicaSet.value: shard = cluster_info.get_shards()[0] bk_cloud_id = shard.members[0].bk_cloud_id @@ -1844,7 +1844,7 @@ def get(self, k): def payload_func_install_dbmon(db_act_template: dict, ctx: CommonContext) -> dict: - """安装dbmon""" + """payload_func_install_dbmon Referenced in install_dbmon_sub.py""" instances_by_ip = ctx.get("instances_by_ip") exec_ip = db_act_template["exec_ip"] nodes = instances_by_ip[exec_ip] diff --git a/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py b/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py index 32b6e9c2d8..46846aba88 100644 --- a/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py +++ b/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py @@ -7,8 +7,11 @@ from backend.db_meta.enums import machine_type from backend.db_meta.enums.cluster_type import ClusterType from backend.db_meta.enums.instance_role import InstanceRole -from backend.db_meta.models import Cluster, Machine, ProxyInstance, StorageInstance +from backend.db_meta.models import AppCache, Cluster, Machine, ProxyInstance, StorageInstance +from backend.flow.consts import MongoDBClusterRole from backend.flow.utils.mongodb import mongodb_password +from backend.ticket.constants import InstanceType + # entities # Node -> ReplicaSet -> Cluster[Rs,ShardedCluster] @@ -17,37 +20,38 @@ class MongoNode: - def __init__(self, ip, port, role, bk_cloud_id, mtype): + def __init__(self, ip, port, role, bk_cloud_id, mtype, domain=None): self.ip: str = ip self.port: str = port self.role: str = role self.bk_cloud_id: int = bk_cloud_id self.machine_type = mtype - self.domain: str = None # 这是关联bind_entry.first().entry + self.domain: str = domain # 这是关联bind_entry.first().entry # s is StorageInstance | ProxyInstance @classmethod - def from_instance(cls, s: Union[ProxyInstance, StorageInstance]): - return MongoNode(s.ip_port.split(":")[0], str(s.port), s.instance_role, s.machine.bk_cloud_id, s.machine_type) - - # from_proxy_instance 能获得domain - @classmethod - def from_proxy_instance(cls, s: ProxyInstance): - m = cls.from_instance(s) - m.domain = s.bind_entry.first().entry - return m + def from_instance(cls, s: Union[ProxyInstance, StorageInstance], withDomain: bool = False): + # withDomain: 默认为False. 取域名需要多查一次db. + # meta_role: ProxyInstance的instance_role属性值为"proxy". 这里改为 mongos + meta_role = MongoDBClusterRole.Mongos.value if s.instance_role == InstanceType.PROXY.value else s.instance_role + domain = None + if withDomain: + domain = s.bind_entry.first().entry + node = MongoNode( + s.ip_port.split(":")[0], str(s.port), meta_role, s.machine.bk_cloud_id, s.machine_type, domain + ) + return node class ReplicaSet: set_name: str - set_type: str + set_type: str # replicaset or shardsvr or configsvr members: List[MongoNode] - def __init__(self, set_name: str = None, members: List[MongoNode] = None): + def __init__(self, set_type: str, set_name: str = None, members: List[MongoNode] = None): + self.set_type = set_type self.set_name = set_name self.members = members - if len(self.members) > 0: - self.set_type = self.members[0].role # get_backup_node 返回MONGO_BACKUP member def get_backup_node(self): @@ -63,7 +67,7 @@ def get_backup_node(self): def get_not_backup_nodes(self): members = [] for m in self.members: - if m.role == InstanceRole.MONGO_BACKUP: + if m.role != InstanceRole.MONGO_BACKUP: members.append(m) return members @@ -74,7 +78,7 @@ def get_bk_cloud_id(self): return None -# MongoDBCluster 有cluster_id cluster_name cluster_type +# MongoDBCluster [interface] 有cluster_id cluster_name cluster_type class MongoDBCluster: bk_cloud_id: int bk_biz_id: int @@ -111,20 +115,36 @@ def __init__( self.region = region @abstractmethod - def get_shards(self): - pass + def get_shards(self) -> List[ReplicaSet]: + raise NotImplementedError @abstractmethod def get_mongos(self) -> List[MongoNode]: - pass + raise NotImplementedError @abstractmethod def get_config(self) -> ReplicaSet: - pass + raise NotImplementedError def get_bk_cloud_id(self) -> int: return self.bk_cloud_id + def is_sharded_cluster(self) -> bool: + return self.cluster_type == str(ClusterType.MongoShardedCluster.value) + + def get_iplist(self) -> List: + iplist = [] + for shard in self.get_shards(): + for member in shard.members: + iplist.append(member.ip) + config_rs = self.get_config() + if config_rs is not None: + for member in config_rs.members: + iplist.append(member.ip) + for mongos in self.get_mongos(): + iplist.append(mongos.ip) + return iplist + class ReplicaSetCluster(MongoDBCluster): shard: ReplicaSet # storages @@ -154,7 +174,8 @@ def __init__( ) self.shard = shard - def get_shards(self): + def get_shards(self, with_config: bool = False) -> List[ReplicaSet]: + # no config return [self.shard] def get_mongos(self) -> List[MongoNode]: @@ -200,8 +221,13 @@ def __init__( self.mongos = mongos self.config = configsvr - def get_shards(self) -> List[ReplicaSet]: - return self.shards + def get_shards(self, with_config: bool = False) -> List[ReplicaSet]: + if not with_config: + return self.shards + + shards = [self.config] + shards.extend(self.shards) + return shards def get_config(self) -> ReplicaSet: return self.config @@ -217,17 +243,18 @@ def __init__(self): pass @classmethod - def fetch_many_cluster(cls, set_get_domain: bool, **kwargs): - # set_get_domain 是否获取复制集的域名 + def fetch_many_cluster(cls, withDomain: bool, **kwargs): + # withDomain 是否: 获取复制集和mongos的域名,赋值在MongoNode的domain属性上 rows: List[MongoDBCluster] = [] v = Cluster.objects.filter(**kwargs) for i in v: if i.cluster_type == ClusterType.MongoReplicaSet.value: # MongoReplicaSet 只有一个Set - if set_get_domain: - shard = ReplicaSet(i.name, [MongoNode.from_proxy_instance(m) for m in i.storageinstance_set.all()]) - else: - shard = ReplicaSet(i.name, [MongoNode.from_instance(m) for m in i.storageinstance_set.all()]) + shard = ReplicaSet( + MongoDBClusterRole.Replicaset.value, + i.name, + [MongoNode.from_instance(m, withDomain) for m in i.storageinstance_set.all()], + ) row = ReplicaSetCluster( bk_cloud_id=i.bk_cloud_id, @@ -245,18 +272,23 @@ def fetch_many_cluster(cls, set_get_domain: bool, **kwargs): elif i.cluster_type == ClusterType.MongoShardedCluster.value: shards = [] configsvr = None - mongos = [MongoNode.from_proxy_instance(m) for m in i.proxyinstance_set.all()] + mongos = [MongoNode.from_instance(m, withDomain=withDomain) for m in i.proxyinstance_set.all()] for m in i.nosqlstoragesetdtl_set.all(): - # seg_range + # find first member members = [MongoNode.from_instance(m.instance)] + # find all receiver member for e in m.instance.as_ejector.all(): members.append(MongoNode.from_instance(e.receiver)) - shard = ReplicaSet(set_name=m.seg_range, members=members) + # configsvr if m.instance.machine_type == machine_type.MachineType.MONOG_CONFIG.value: + shard = ReplicaSet(MongoDBClusterRole.ConfigSvr.value, set_name=m.seg_range, members=members) configsvr = shard + + # shardsvr else: + shard = ReplicaSet(MongoDBClusterRole.ShardSvr.value, set_name=m.seg_range, members=members) shards.append(shard) row = ShardedCluster( @@ -278,15 +310,15 @@ def fetch_many_cluster(cls, set_get_domain: bool, **kwargs): return rows @classmethod - def fetch_one_cluster(cls, set_get_domain: bool, **kwargs): - rows = cls.fetch_many_cluster(set_get_domain, **kwargs) + def fetch_one_cluster(cls, withDomain: bool, **kwargs): + rows = cls.fetch_many_cluster(withDomain, **kwargs) if len(rows) > 0: return rows[0] return None @classmethod - def fetch_many_cluster_dict(cls, set_get_domain: bool, **kwargs): - clusters = cls.fetch_many_cluster(set_get_domain, **kwargs) + def fetch_many_cluster_dict(cls, withDomain: bool = False, **kwargs): + clusters = cls.fetch_many_cluster(withDomain, **kwargs) clusters_map = {} for cluster in clusters: clusters_map[cluster.cluster_id] = cluster @@ -315,6 +347,35 @@ def get_cluster_id_by_host(hosts: List, bk_cloud_id: int) -> List[int]: return list(set(cluster_list)) + @classmethod + def get_cluster_id_by_domain(cls, cluster_domain: [str]) -> List[int]: + cluster_domain = request_validator.validated_str_list(cluster_domain) + cluster_list = [] + rows = Cluster.objects.filter(immute_domain__in=cluster_domain) + for row in rows: + cluster_list.append(row.id) + return cluster_list + + @classmethod + def get_host_from_nodes(cls, nodes: List[MongoNode]) -> List: + """ + get_host_from_nodes 提取bk_host,且去重 + @param nodes: MongoNode列表 + """ + bk_host_list = [] + cloud_id = nodes[0].bk_cloud_id + ips = [] + for v in nodes: + ips.append(v.ip) + if v.bk_cloud_id != cloud_id: + raise Exception("cannot exist two cloud_id") + + ips = list(set(ips)) + for ip in ips: + bk_host_list.append({"ip": ip, "bk_cloud_id": cloud_id}) + + return bk_host_list + class MongoDBNsFilter(object): class Serializer(serializers.Serializer): @@ -441,17 +502,13 @@ def __json__without_password__(self): "set_name": self.set_name, } - def append_set_info(self, rs: ReplicaSet): - self.set_name = rs.set_name - self.role_type = rs.set_type - def append_cluster_info(self, clu: MongoDBCluster): self.cluster_id = clu.cluster_id self.cluster_name = clu.name self.cluster_type = clu.cluster_type self.cluster_domain = clu.immute_domain - self.app = clu.app - self.app_name = clu.app + self.app = AppCache.get_app_attr(clu.bk_biz_id, "db_app_abbr") + self.app_name = AppCache.get_biz_name(clu.bk_biz_id) self.bk_cloud_id = clu.bk_cloud_id self.bk_biz_id = clu.bk_biz_id @@ -462,12 +519,18 @@ def from_node(cls, node: MongoNode, rs: ReplicaSet = None, clu: MongoDBCluster = m.ip = node.ip m.port = int(node.port) m.meta_role = node.role - # if mongos, set set_name && role_type to 'mongos' for compatibility if m.meta_role == machine_type.MachineType.MONGOS.value: - m.set_name = machine_type.MachineType.MONGOS.value - m.role_type = machine_type.MachineType.MONGOS.value - elif rs is not None: - m.append_set_info(rs) + # if mongos, set set_name && role_type to 'mongos' for compatibility + m.set_name = m.meta_role + m.role_type = m.meta_role + else: + # not mongos + if rs is not None: + m.set_name = rs.set_name + m.role_type = rs.set_type + else: + m.set_name = "" + m.role_type = "" if clu is not None: m.append_cluster_info(clu) @@ -489,16 +552,25 @@ def from_hosts(iplist: List, bk_cloud_id: int) -> List: if not cluster_id_list: return instance_list - clusters = MongoRepository.fetch_many_cluster_dict(set_get_domain=False, id__in=cluster_id_list) - for cluster_id in clusters: - cluster = clusters[cluster_id] + clusters = MongoRepository.fetch_many_cluster_dict(withDomain=False, id__in=cluster_id_list) + for cluster in clusters.values(): + for member in cluster.get_mongos(): + if member.ip in iplist: + instance_list.append(MongoNodeWithLabel.from_node(member, None, cluster)) + for rs in cluster.get_shards(): + if not rs: + continue for member in rs.members: if member.ip in iplist: instance_list.append(MongoNodeWithLabel.from_node(member, rs, cluster)) - for m in cluster.get_mongos(): - if m.ip in iplist: - instance_list.append(MongoNodeWithLabel.from_node(m, None, cluster)) + + rs = cluster.get_config() + if not rs: + continue + for member in rs.members: + if member.ip in iplist: + instance_list.append(MongoNodeWithLabel.from_node(member, rs, cluster)) return instance_list diff --git a/dbm-ui/backend/flow/utils/mongodb/mongodb_script_template.py b/dbm-ui/backend/flow/utils/mongodb/mongodb_script_template.py index 394858a726..d564e7c2a6 100644 --- a/dbm-ui/backend/flow/utils/mongodb/mongodb_script_template.py +++ b/dbm-ui/backend/flow/utils/mongodb/mongodb_script_template.py @@ -143,12 +143,15 @@ def make_script_common_kwargs(timeout=3600, exec_account="root", is_param_sensit } -def prepare_recover_dir_script() -> str: +def prepare_recover_dir_script(dest_dir: str) -> str: + if not dest_dir.startswith("/data/dbbak"): + raise Exception("dest_dir must start with /data/dbbak") + script = """ # todo add root id and node id set -x -mkdir -p /data/dbbak/recover_mg +mkdir -p {} echo return code $? -chown -R {} /data/dbbak/recover_mg +chown -R {} {} echo return code $?""" - return script.format("mysql") + return script.format(dest_dir, "mysql", dest_dir) diff --git a/dbm-ui/backend/flow/utils/mongodb/mongodb_util.py b/dbm-ui/backend/flow/utils/mongodb/mongodb_util.py new file mode 100644 index 0000000000..a1a50b2955 --- /dev/null +++ b/dbm-ui/backend/flow/utils/mongodb/mongodb_util.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# 导入模块 +from backend.components import DBConfigApi +from backend.components.dbconfig.constants import FormatType, LevelName +from backend.flow.consts import ConfigFileEnum, ConfigTypeEnum, MongoDBManagerUser, NameSpaceEnum +from backend.flow.utils.mongodb import mongodb_password + + +# MongoUtil: MongoDB工具类 用于获取MongoDB的配置信息 以及用户密码等 +class MongoUtil: + @staticmethod + def _get_define_config(bk_biz_id, namespace, conf_file, conf_type: str): + """获取一些全局的参数配置""" + """ bk_biz_id 为"0"时,表示平台级别配置""" + data = DBConfigApi.query_conf_item( + params={ + "bk_biz_id": bk_biz_id, + "level_name": LevelName.PLAT if bk_biz_id == "0" else LevelName.APP, + "level_value": bk_biz_id, + "conf_file": conf_file, + "conf_type": conf_type, + "namespace": namespace, + "format": FormatType.MAP.value, + } + ) + return data["content"] + + def get_mongodb_os_conf(self, bk_biz_id: str = "0"): + """ + 获取os配置信息 + """ + + return self._get_define_config( + bk_biz_id=bk_biz_id, + namespace=NameSpaceEnum.MongoDBCommon.value, + conf_type=ConfigTypeEnum.Config.value, + conf_file=ConfigFileEnum.OsConf.value, + ) + + @staticmethod + def get_dba_user_password(ip: str, port, bk_cloud_id: int): + """ + 获取dba user and password + """ + dba_user = MongoDBManagerUser.DbaUser.value + out = mongodb_password.MongoDBPassword().get_password_from_db(ip, int(port), bk_cloud_id, dba_user) + if not out or "password" not in out: + raise Exception("can not get dba_user password for {}:{}:{}".format(ip, port, bk_cloud_id)) + + return dba_user, out["password"] diff --git a/dbm-ui/backend/flow/views/mongodb_scene.py b/dbm-ui/backend/flow/views/mongodb_scene.py index 3dacd9c34a..63de125bb6 100644 --- a/dbm-ui/backend/flow/views/mongodb_scene.py +++ b/dbm-ui/backend/flow/views/mongodb_scene.py @@ -50,13 +50,52 @@ def post(request): mongo_backup """ root_id = uuid.uuid1().hex - # root_id 32位字串 320ba2d87a1411eebbed525400066689 - # request 是一个request对象 - # request.data 输入Json MongoDBController(root_id=root_id, ticket_data=request.data).mongo_backup() return Response({"root_id": root_id}) +class MongoRestoreApiView(FlowTestView): + """ + Mongo Restore Api + """ + + @staticmethod + def post(request): + """ + mongo_restore + """ + root_id = uuid.uuid1().hex + MongoDBController(root_id=root_id, ticket_data=request.data).mongo_restore() + return Response({"root_id": root_id}) + + +class MongoPitrRestoreApiView(FlowTestView): + """ + Mongo PitrRestore Api + """ + + @staticmethod + def post(request): + """ + mongo_restore + """ + root_id = uuid.uuid1().hex + MongoDBController(root_id=root_id, ticket_data=request.data).mongo_pitr_restore() + return Response({"root_id": root_id}) + + +class MongoInstallDbmonApiView(FlowTestView): + """ + Mongo InstallDbmon Api + """ + + @staticmethod + def post(request): + root_id = uuid.uuid1().hex + MongoDBController(root_id=root_id, ticket_data=request.data).install_dbmon() + return Response({"root_id": root_id}) + + class MongoFakeInstallApiView(FlowTestView): """ Mongo Backup Api @@ -68,9 +107,6 @@ def post(request): mongo_backup """ root_id = uuid.uuid1().hex - # root_id 32位字串 320ba2d87a1411eebbed525400066689 - # request 是一个request对象 - # request.data 输入Json MongoDBController(root_id=root_id, ticket_data=request.data).fake_install() return Response({"root_id": root_id}) diff --git a/dbm-ui/backend/ticket/constants.py b/dbm-ui/backend/ticket/constants.py index 1684f05f51..4003fdcf19 100644 --- a/dbm-ui/backend/ticket/constants.py +++ b/dbm-ui/backend/ticket/constants.py @@ -410,6 +410,7 @@ def get_db_type_by_ticket(cls, ticket_type, raise_exception=False): MONGODB_CUTOFF = TicketEnumField("MONGODB_CUTOFF", _("MongoDB 整机替换"), _("集群维护")) MONGODB_AUTHORIZE_RULES = TicketEnumField("MONGODB_AUTHORIZE_RULES", _("MongoDB 授权"), _("权限管理")) MONGODB_EXCEL_AUTHORIZE_RULES = TicketEnumField("MONGODB_EXCEL_AUTHORIZE_RULES", _("MongoDB Excel授权"), _("权限管理")) # noqa + MONGODB_IMPORT = TicketEnumField("MONGODB_IMPORT", _("MongoDB 数据导入"), _("集群维护")) MONGODB_RESTORE = TicketEnumField("MONGODB_RESTORE", _("MongoDB 定点回档"), _("集群维护")) MONGODB_TEMPORARY_DESTROY = TicketEnumField("MONGODB_TEMPORARY_DESTROY", _("MongoDB 临时集群销毁"), _("集群维护")) MONGODB_INSTALL_DBMON = TicketEnumField("MONGODB_INSTALL_DBMON", _("MongoDB 安装DBMon"), _("集群维护")) diff --git a/dbm-ui/backend/utils/time.py b/dbm-ui/backend/utils/time.py index 554d58dcf2..90f809bb1d 100644 --- a/dbm-ui/backend/utils/time.py +++ b/dbm-ui/backend/utils/time.py @@ -166,7 +166,7 @@ def find_nearby_time( # 越界的情况抛出错误,交给业务逻辑处理 index = bisect_right(time_keys, match_time) - flag if index < 0 or index >= len(time_keys): - raise IndexError(_("无法找到合适的附近时间点")) + raise IndexError(_("无法找到合适的附近时间点 {}".format(match_time))) return index