Skip to content

Commit

Permalink
fix(mongodb): 修复备份单据bug #6773
Browse files Browse the repository at this point in the history
  • Loading branch information
cycker committed Sep 11, 2024
1 parent 7fa7130 commit 2761920
Show file tree
Hide file tree
Showing 32 changed files with 1,206 additions and 233 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ pre-*-bkcodeai

bkcodeai.json
package-lock.json

### PreCI ###
.codecc
1 change: 0 additions & 1 deletion dbm-ui/backend/db_meta/enums/cluster_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class ClusterType(str, StructuredEnum):
MongoShardedCluster = EnumField("MongoShardedCluster", _("Mongo分片集群"))

Riak = EnumField("riak", _("Riak集群"))

SqlserverSingle = EnumField("sqlserver_single", _("sqlserver单节点版"))
SqlserverHA = EnumField("sqlserver_ha", _("sqlserver主从版"))

Expand Down
8 changes: 7 additions & 1 deletion dbm-ui/backend/db_meta/enums/machine_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class MachineType(str, StructuredEnum):
VM_INSERT = EnumField("vminsert", _("vminsert"))
VM_SELECT = EnumField("vmselect", _("vmselect"))
VM_AUTH = EnumField("vmauth", _("vmauth"))

# 仅用于TBinlogDumper实例的管控
TBinlogDumper = EnumField("tbinlogdumper", _("TBinlogDumper"))


class MongoSetType(str, StructuredEnum):
Mongos = EnumField("mongos", _("mongos"))
ShardSvr = EnumField("shardsvr", _("shardsvr"))
Configsvr = EnumField("configsvr", _("configsvr"))
Replicaset = EnumField("replicaset", _("replicaset"))
34 changes: 22 additions & 12 deletions dbm-ui/backend/db_services/mongodb/restore/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Dict, List
Expand All @@ -23,7 +24,9 @@
from backend.exceptions import AppBaseException
from backend.ticket.constants import TicketType
from backend.ticket.models import ClusterOperateRecord, Ticket
from backend.utils.time import find_nearby_time, timezone2timestamp
from backend.utils.time import find_nearby_time

logger = logging.getLogger("root")


class MongoDBRestoreHandler(object):
Expand All @@ -38,7 +41,8 @@ def _get_log_from_bklog(collector: str, start_time: datetime, end_time: datetime

def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str, time_key: str, flag: int):
"""查询距离rollback_time最近的备份记录"""
end_time = rollback_time
""" end_time 要获得rollback_time后的一个incr文件,这里多查一天,就比较稳了"""
end_time = rollback_time + timedelta(days=1)
start_time = end_time - timedelta(days=BACKUP_LOG_RANGE_DAYS)

backup_logs = self._get_log_from_bklog(
Expand All @@ -48,35 +52,41 @@ def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str
query_string=query_string,
)
if not backup_logs:
raise AppBaseException(_("距离回档时间点7天内没有备份日志").format(rollback_time))
raise AppBaseException(_("距离回档时间点7天内没有备份日志 {} {}").format(query_string, rollback_time))

# 获取距离回档时间最近的全备日志
backup_logs.sort(key=lambda x: x[time_key])
time_keys = [log[time_key] for log in backup_logs]
try:
latest_backup_log_index = find_nearby_time(time_keys, timezone2timestamp(rollback_time), flag)
latest_backup_log_index = find_nearby_time(time_keys, rollback_time, flag)
except IndexError:
raise AppBaseException(_("无法找到时间点{}附近的全备日志记录").format(rollback_time))
raise AppBaseException(_("无法找到时间点{}附近的全备日志记录 query_string:{} ").format(rollback_time, query_string))

return backup_logs, latest_backup_log_index

def query_latest_backup_log(self, rollback_time: datetime) -> Dict[str, Any]:
def query_latest_backup_log(self, rollback_time: datetime, set_name: str = None) -> Dict[str, Any]:
"""
查询距离rollback_time最近的全备-增量备份文件
@param rollback_time: 回档时间
@param rollback_time: 回档时query_ticket_backup_log间
@param set_name: 指定SetName. cluster_type为ReplicaSet时,只有一个set_name, 可以为空.
"""
# 获取距离回档时间最近的全备日志
query_string = f"cluster: {self.cluster.id} AND pitr_file_type: {PitrFillType.FULL}"
query_string = f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.FULL}"
if set_name is not None:
query_string += f" AND set_name: {set_name}"
full_backup_logs, full_latest_index = self._query_latest_log_and_index(
rollback_time, query_string, time_key="pitr_last_pos", flag=1
)
latest_full_backup_log = full_backup_logs[full_latest_index]

logger.info("latest_full_backup_log {}".format(latest_full_backup_log))
# 找到与全备日志pitr_fullname相同的增量备份日志
pitr_fullname = latest_full_backup_log["pitr_fullname"]
query_string = (
f"cluster: {self.cluster.id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}"
f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}"
)
if set_name is not None:
query_string += f" AND set_name: {set_name}"

incr_backup_logs, incr_latest_index = self._query_latest_log_and_index(
rollback_time, query_string, time_key="pitr_last_pos", flag=0
)
Expand Down Expand Up @@ -116,7 +126,7 @@ def _query_shard_ticket_backup_log(cls, cluster_id, start_time, end_time):
collector="mongo_backup_result",
start_time=start_time,
end_time=end_time,
query_string=f"cluster_id: {cluster_id} AND releate_bill_id: /[0-9]*/",
query_string=f"cluster_id: {cluster_id} AND related_bill_id: /[0-9]*/",
)
if not backup_logs:
raise AppBaseException(_("{}-{}内没有通过单据备份的日志").format(start_time, end_time))
Expand All @@ -132,7 +142,7 @@ def _query_replicaset_ticket_backup_log(cls, cluster_ids, start_time, end_time):
collector="mongo_backup_result",
start_time=start_time,
end_time=end_time,
query_string=f"cluster_type: {ClusterType.MongoReplicaSet} AND releate_bill_id: /[0-9]*/",
query_string=f"cluster_type: {ClusterType.MongoReplicaSet} AND related_bill_id: /[0-9]*/",
)
if not backup_logs:
raise AppBaseException(_("{}-{}内没有通过单据备份的日志").format(start_time, end_time))
Expand Down
11 changes: 5 additions & 6 deletions dbm-ui/backend/env/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
CMDB_NO_MONITOR_STATUS = get_type_env(key="CMDB_NO_MONITOR_STATUS", _type=str, default="运营中[无告警]")
CMDB_NEED_MONITOR_STATUS = get_type_env(key="CMDB_NEED_MONITOR_STATUS", _type=str, default="运营中[需告警]")


# 蓝鲸全业务业务ID
JOB_BLUEKING_BIZ_ID = get_type_env(key="JOB_BLUEKING_BIZ_ID", _type=int, default=9991001)

Expand Down Expand Up @@ -87,10 +86,9 @@
# SaaS访问地址,用于用户访问/第三方应用跳转/Iframe/Grafana 等场景
BK_SAAS_HOST = get_type_env(key="BK_SAAS_HOST", _type=str, default="http://bk-dbm")
# BK_SAAS_CALLBACK_URL 用于 接口回调/权限中心访问 等场景
BK_SAAS_CALLBACK_URL = (
# 通常因证书问题,这里需要使用 http
get_type_env(key="BK_SAAS_CALLBACK_URL", _type=str, default="")
or BK_SAAS_HOST.replace("https", "http")
# 通常因证书问题,这里需要使用 http
BK_SAAS_CALLBACK_URL = get_type_env(key="BK_SAAS_CALLBACK_URL", _type=str, default="") or BK_SAAS_HOST.replace(
"https", "http"
)

# 其他系统访问地址
Expand Down Expand Up @@ -125,7 +123,6 @@
ENABLE_CLEAN_EXPIRED_FLOW_INSTANCE = get_type_env(key="ENABLE_CLEAN_EXPIRED_FLOW_INSTANCE", _type=bool, default=False)
BAMBOO_TASK_VALIDITY_DAY = get_type_env(key="BAMBOO_TASK_VALIDITY_DAY", _type=int, default=360)


# 是否在部署 MySQL 的时候安装 PERL
YUM_INSTALL_PERL = get_type_env(key="YUM_INSTALL_PERL", _type=bool, default=False)

Expand Down Expand Up @@ -179,3 +176,5 @@

# window ssh服务远程端口
WINDOW_SSH_PORT = get_type_env(key="WINDOW_SSH_PORT", _type=int, default=22)
# 本地测试人员优先使用的版本
REPO_VERSION_FOR_DEV = get_type_env(key="REPO_VERSION_FOR_DEV", _type=str, default="")
6 changes: 5 additions & 1 deletion dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,13 @@ class MongoDBActuatorActionEnum(str, StructuredEnum):
Backup = EnumField("mongodb_backup", _("mongodb_backup"))
RemoveNs = EnumField("mongodb_remove_ns", _("mongodb_remove_ns"))
Restore = EnumField("mongodb_restore", _("mongodb_restore"))
PitRestore = EnumField("mongodb_pit_restore", _("mongodb_pit_restore"))
PitRestore = EnumField("mongodb_pitr_restore", _("mongodb_pitr_restore"))
MongoRestart = EnumField("mongo_restart", _("mongo_restart"))
MongoDReplace = EnumField("mongod_replace", _("mongod_replace"))
MongoDeInstall = EnumField("mongo_deinstall", _("mongo_deinstall"))
InstallDBMon = EnumField("install_dbmon", _("install_dbmon"))
MongoStart = EnumField("mongo_start", _("mongo_start"))
MongoHello = EnumField("mongodb_hello", _("mongodb_hello"))


class EsActuatorActionEnum(str, StructuredEnum):
Expand Down Expand Up @@ -737,6 +738,7 @@ class ConfigDefaultEnum(list, StructuredEnum):
class DirEnum(str, StructuredEnum):
GSE_DIR = EnumField("/usr/local/gse_bkte", _("gcs 安装路径"))
REDIS_KEY_LIFE_DIR = EnumField("/data/dbbak/keylifecycle", _("key生命周期路径"))
MONGO_RECOVER_DIR = EnumField("/data/dbbak/recover_mg", _("mongo恢复路径"))


class TruncateDataTypeEnum(str, StructuredEnum):
Expand Down Expand Up @@ -1166,6 +1168,8 @@ class MongoDBClusterRole(str, StructuredEnum):

ConfigSvr = EnumField("configsvr", _("configsvr"))
ShardSvr = EnumField("shardsvr", _("shardsvr"))
Mongos = EnumField("mongos", _("mongos"))
Replicaset = EnumField("replicaset", _("replicaset"))


class MongoDBTotalCache(float, StructuredEnum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ def __init__(self, db_type: str = DBType.MySQL):
"""
@param db_type: db类型,默认是MySQL,如果是Redis这actuator包不一样
"""
# repo_version 如果REPO_VERSION_FOR_DEV有值,则使用REPO_VERSION_FOR_DEV,否则使用最新版本
# 正式环境: REPO_VERSION_FOR_DEV为空 个人测试环境中,REPO_VERSION_FOR_DEV 按需配置
dev_env = str(env.REPO_VERSION_FOR_DEV)
repo_version = dev_env if dev_env != "" else MediumEnum.Latest

self.actuator_pkg = Package.get_latest_package(
version=MediumEnum.Latest, pkg_type=MediumEnum.DBActuator, db_type=db_type
version=repo_version, pkg_type=MediumEnum.DBActuator, db_type=db_type
)

def get_db_actuator_package(self):
Expand Down
6 changes: 6 additions & 0 deletions dbm-ui/backend/flow/engine/bamboo/scene/mongodb/base_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ def check_cluster_valid(cls, cluster: MongoDBCluster, payload):
cluster.bk_biz_id, payload["bk_biz_id"], type(cluster.bk_biz_id), type(payload["bk_biz_id"])
)
)

@staticmethod
def check_cluster_id_list(cluster_id_list):
cluster_id_list_set = set(cluster_id_list)
if len(cluster_id_list_set) != len(cluster_id_list):
raise Exception("duplicate cluster_id")
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from backend.flow.engine.bamboo.scene.mongodb.base_flow import MongoBaseFlow
from backend.flow.engine.bamboo.scene.mongodb.sub_task.backup import BackupSubTask
from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs
from backend.flow.utils.mongodb.mongodb_repo import MongoDBNsFilter, MongoRepository
from backend.flow.utils.mongodb.mongodb_util import MongoUtil

logger = logging.getLogger("flow")

Expand Down Expand Up @@ -67,12 +67,9 @@ def start(self):
"""
logger.debug("MongoBackupFlow start, payload", self.payload)
# actuator_workdir 提前创建好的,在部署的时候就创建好了.
actuator_workdir = ActKwargs().get_mongodb_os_conf()["file_path"]
actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"]
file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package()

# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.payload)

# 解析输入 确定每个输入的域名实例都存在.
# 1. 解析每个集群Id的节点列表
# 2. 备份一般在某个Secondary且非Backup节点上执行
Expand All @@ -85,8 +82,12 @@ def start(self):
bk_host_list = []

cluster_id_list = [row["cluster_id"] for row in self.payload["infos"]]
self.check_cluster_id_list(cluster_id_list)
clusters = MongoRepository.fetch_many_cluster_dict(id__in=cluster_id_list)

# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.payload)

for row in self.payload["infos"]:
try:
cluster_id = row["cluster_id"]
Expand All @@ -111,7 +112,7 @@ def start(self):
raise Exception("sub_bk_host_list is None")

bk_host_list.extend(sub_bk_host_list)
sub_pipelines.append(sub_pl.build_sub_process(_("MongoDB-备份-{}").format(cluster.name)))
sub_pipelines.append(sub_pl.build_sub_process(_("Backup:{}").format(cluster.immute_domain)))

# 介质下发 bk_host_list 在SendMedia.act会去重.
pipeline.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ def multi_replicaset_install_flow(self):
kwargs=kwargs,
)

# # 安装dbmon
# self.install_dbmon(data=self.data, pipeline=pipeline)
# 安装dbmon
self.install_dbmon(data=self.data, pipeline=pipeline)

# 运行流程
pipeline.run_pipeline()

Expand Down Expand Up @@ -224,7 +225,7 @@ def cluster_install_flow(self):
)

# 安装dbmon
# self.install_dbmon(data=self.data, pipeline=pipeline)
self.install_dbmon(data=self.data, pipeline=pipeline)

# 运行流程
pipeline.run_pipeline()
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@
)
from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs
from backend.flow.utils.mongodb.mongodb_repo import MongoNodeWithLabel
from backend.flow.utils.mongodb.mongodb_repo import MongoNodeWithLabel, MongoRepository
from backend.flow.utils.mongodb.mongodb_util import MongoUtil

logger = logging.getLogger("flow")


def get_pkg_info():
# repo_version 如果REPO_VERSION_FOR_DEV有值,则使用REPO_VERSION_FOR_DEV,否则使用最新版本
# 正式环境中,REPO_VERSION_FOR_DEV为空
# 个人测试环境中,REPO_VERSION_FOR_DEV 按需配置
repo_version = env.REPO_VERSION_FOR_DEV if env.REPO_VERSION_FOR_DEV else MediumEnum.Latest

actuator_pkg = Package.get_latest_package(
version=MediumEnum.Latest, pkg_type=MediumEnum.DBActuator, db_type=DBType.MongoDB
version=repo_version, pkg_type=MediumEnum.DBActuator, db_type=DBType.MongoDB
)

dbtools_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type="dbtools", db_type=DBType.MongoDB)
toolkit_pkg = Package.get_latest_package(
version=MediumEnum.Latest, pkg_type="mongo-toolkit", db_type=DBType.MongoDB
Expand All @@ -54,7 +61,7 @@ def add_install_dbmon(flow, flow_data, pipeline, iplist, bk_cloud_id, allow_empt
allow_empty_instance 上架流程中,允许ip没有实例. allow_empty_instance = True
"""

actuator_workdir = flow.get_kwargs.file_path
actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"]
pkg_info = get_pkg_info()
file_list = [
"{}/{}/{}".format(env.BKREPO_PROJECT, env.BKREPO_BUCKET, pkg_info.get("actuator_pkg").path),
Expand Down Expand Up @@ -121,19 +128,15 @@ def add_install_dbmon(flow, flow_data, pipeline, iplist, bk_cloud_id, allow_empt
pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)


class MongoInstallDBMon(MongoBaseFlow):
class MongoInstallDBMonFlow(MongoBaseFlow):
class Serializer(serializers.Serializer):
class DataRow(serializers.Serializer):
ip = serializers.CharField()
object_type = serializers.CharField()

uid = serializers.CharField()
created_by = serializers.CharField()
bk_biz_id = serializers.IntegerField()
ticket_type = serializers.CharField()
action = serializers.CharField()
bk_cloud_id = serializers.IntegerField()
infos = DataRow(many=True)
infos = serializers.ListField(child=serializers.CharField()) # ip or cluster_id

"""MongoInstallDBMon flow
分析 payload,检查输入,生成Flow """
Expand Down Expand Up @@ -162,8 +165,38 @@ def start(self):
logger.debug("MongoInstallDBMon start, payload", self.payload)
# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.payload)
add_install_dbmon(
self, self.payload, pipeline, [x["ip"] for x in self.payload["infos"]], self.payload["bk_cloud_id"]
)

# parse iplist
iplist = self.get_iplist(self.payload["infos"], bk_cloud_id=self.payload["bk_cloud_id"])

add_install_dbmon(self, self.payload, pipeline, iplist, self.payload["bk_cloud_id"])
# 运行流程
pipeline.run_pipeline()

@staticmethod
def get_iplist(infos: list, bk_cloud_id: int) -> list[str]:
iplist = []
cluster_id_list = []
cluster_domain_list = []
for v in infos:
if v.isdigit():
cluster_id_list.append(int(v))
elif v.endswith(".db"):
cluster_domain_list.append(v)
else:
iplist.append(v)

if cluster_domain_list:
tmp_cluster_id_list = MongoRepository.get_cluster_id_by_domain(cluster_domain_list)
cluster_id_list.extend(tmp_cluster_id_list)

if cluster_id_list:
cluster_id_list = list(set(cluster_id_list)) # unique
clusters = MongoRepository.fetch_many_cluster(withDomain=False, id__in=cluster_id_list)
for cluster in clusters:
if cluster.bk_cloud_id == bk_cloud_id:
iplist.extend(cluster.get_iplist())
else:
raise Exception("bk_cloud_id not match {} vs {}".format(bk_cloud_id, cluster.bk_cloud_id))

return list(set(iplist))
Loading

0 comments on commit 2761920

Please sign in to comment.