Skip to content

Commit

Permalink
feat(backend): 支持下架集群实时操作采集器 #8584
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 authored and iSecloud committed Dec 13, 2024
1 parent b5215f8 commit 6da4e5f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
10 changes: 9 additions & 1 deletion dbm-ui/backend/db_meta/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

from django.db.models.signals import pre_delete

from backend.db_meta.enums import ClusterStatus
from backend.db_meta.enums import ClusterStatus, ClusterType
from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance
from backend.flow.consts import OperateCollectorActionEnum
from backend.flow.utils.cc_manage import trigger_operate_collector

logger = logging.getLogger("root")

Expand All @@ -29,6 +31,12 @@ def update_cluster_status(sender, instance: Union[StorageInstance, ProxyInstance
if kwargs.get("signal") == pre_delete and not isinstance(instance, Cluster):
# 提前删除实例与cluster的关联关系
cluster = instance.cluster.first()
trigger_operate_collector(
ClusterType.cluster_type_to_db_type(cluster.cluster_type),
instance.machine_type,
bk_instance_ids=[instance.bk_instance_id],
action=OperateCollectorActionEnum.INSTALL.value,
)
if cluster and sender == StorageInstance:
cluster.storageinstance_set.remove(instance)
elif cluster and sender == ProxyInstance:
Expand Down
5 changes: 5 additions & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class StateType(str, StructuredEnum):
DEFAULT_INSTANCE = {"ip": "0.0.0.0", "port": 0, "bk_cloud_id": 0}


class OperateCollectorActionEnum(str, StructuredEnum):
INSTALL = EnumField("install", _("安装"))
UNINSTALL = EnumField("UNINSTALL", _("卸载"))


class NameSpaceEnum(str, StructuredEnum):
Common = EnumField("common", _("共用参数"))
RedisCommon = EnumField("rediscomm", _("redis共用参数"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ def _execute(self, data, parent_data):
bk_module_ids = kwargs["bk_module_ids"]
bk_host_ids = kwargs["bk_host_ids"]
update_host_properties = kwargs.get("update_host_properties", None)
operate_collector_action = kwargs.get("operate_collector_action", None)
try:
cc_manage = CcManage(bk_biz_id=bk_biz_id, cluster_type="")
cc_manage.transfer_host_module(
bk_host_ids=bk_host_ids, target_module_ids=bk_module_ids, update_host_properties=update_host_properties
bk_host_ids=bk_host_ids,
target_module_ids=bk_module_ids,
update_host_properties=update_host_properties,
operate_collector_action=operate_collector_action,
)
return True
except Exception as err: # pylint: disable=broad-except
Expand Down
54 changes: 43 additions & 11 deletions dbm-ui/backend/flow/utils/cc_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from backend.dbm_init.constants import CC_HOST_DBM_ATTR
from backend.dbm_init.services import Services
from backend.exceptions import ApiError
from backend.flow.consts import OperateCollectorActionEnum
from backend.utils.redis import RedisConn

logger = logging.getLogger("flow")
Expand Down Expand Up @@ -168,6 +169,23 @@ def batch_update_host(host_info_list: List[Dict[str, Any]], need_monitor: bool):

return updated_hosts, failed_updates

@staticmethod
def operate_host_collectors(bk_host_ids: List[int], action: str):
"""
操作主机采集器,前提是这些主机已经存在服务实例了
"""
# 查询主机的服务实例
bk_instance_ids = []
bk_instance_ids.extend(
StorageInstance.objects.filter(machine__bk_host_id__in=bk_host_ids).values("bk_instance_id", flat=True)
)
bk_instance_ids.extend(
ProxyInstance.objects.filter(machine__bk_host_id__in=bk_host_ids).values("bk_instance_id", flat=True)
)

# 操作采集器
trigger_operate_collector(bk_instance_ids=bk_instance_ids, action=action)

def update_host_properties(
self, bk_host_ids: List[int], need_monitor: bool = True, dbm_meta=None, update_operator: bool = True
):
Expand Down Expand Up @@ -221,6 +239,7 @@ def transfer_host_to_idlemodule(
self, bk_biz_id: int, bk_host_ids: List[int], biz_idle_module: int = None, host_topo: List[Dict] = None
):
"""将主机转移到当前业务的空闲模块"""
self.operate_host_collectors(bk_host_ids, OperateCollectorActionEnum.UNINSTALL.value)

# 获取业务的空闲模块和主机拓扑信息
biz_idle_module = biz_idle_module or self.get_biz_internal_module(bk_biz_id)[IDLE_HOST_MODULE]["bk_module_id"]
Expand Down Expand Up @@ -250,12 +269,14 @@ def transfer_host_module(
target_module_ids: list,
is_increment: bool = False,
update_host_properties: dict = None,
operate_collector_action: str = None,
):
"""
@param bk_host_ids 主机id列表
@param target_module_ids 目标模块id列表
@param is_increment 是否增量转移,即主机处于多模块
@param update_host_properties 主机属性更新选项
@param operate_collector_action 是否操作主机采集器
跨业务转移主机,需要先做中转处理
循环判断处理,逻辑保证幂等操作
考虑这几种情况:
Expand All @@ -269,6 +290,11 @@ def transfer_host_module(
# 有些角色允许为空,所以要忽略
return

if operate_collector_action is not None:
# 主机通常会在转移模块后再添加服务实例,添加服务实例后再操作采集器
# 这里转移主机模块,大概率是给转移空闲机,进行采集器卸载操作
self.operate_host_collectors(bk_host_ids, operate_collector_action)

bk_host_ids = list(set(bk_host_ids))
# 查询当前bk_hosts_ids的业务对应关系
logger.info(f"transfer_host_module, bk_host_ids:{bk_host_ids}")
Expand Down Expand Up @@ -331,6 +357,9 @@ def recycle_host(self, bk_host_ids: list):
转移主机后会自动删除服务实例,无需额外操作
"""
bk_host_ids = list(set(bk_host_ids))
self.operate_host_collectors(bk_host_ids, OperateCollectorActionEnum.UNINSTALL.value)

# 转移机器到待回收
CCApi.transfer_host_to_recyclemodule(
params={"bk_biz_id": self.hosting_biz_id, "bk_host_id": bk_host_ids}, use_admin=True
)
Expand Down Expand Up @@ -496,25 +525,23 @@ def delete_instance_modules(self, db_type: str, ins: StorageInstance, cluster_ty
self.delete_cc_module(db_type, cluster_type, instance_id=ins.id)


def format_operate_collector_cache_key(db_type, machine_type, suffix="") -> str:
def format_operate_collector_cache_key(db_type, machine_type, action, suffix) -> tuple:
"""
生成操作采集器缓存key
"""
cache_key = f"operate_collector_{db_type}_{machine_type}"
if suffix:
cache_key = f"{cache_key}_{suffix}"
return cache_key
cache_key = f"operate_collector_{db_type}_{machine_type}_{action}"
trigger_time_key = f"{cache_key}_{suffix}"
return cache_key, trigger_time_key


@app.task
def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, action="install"):
def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, action: str):
"""
操作采集器
调用监控 API,针对本次变更的范围进行下发
"""
cache_key = format_operate_collector_cache_key(db_type, machine_type)
trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, "trigger_time")
cache_key, trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, action, "trigger_time")
trigger_time = RedisConn.get(trigger_time_key)

# 通过触发时间,加上延迟窗口,来实现滚动窗口,避免串行调用节点管理
Expand Down Expand Up @@ -551,11 +578,17 @@ def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, ac


def trigger_operate_collector(
db_type: str = None, machine_type: str = None, bk_instance_ids: list = None, action="install"
db_type: str = None,
machine_type: str = None,
bk_instance_ids: list = None,
action=OperateCollectorActionEnum.INSTALL.value,
):
"""
触发操作采集器
"""
# 排除掉 bk_instance_ids 中包含 0 的值,可能是脏数据
bk_instance_ids = [bk_instance_id for bk_instance_id in bk_instance_ids if bk_instance_id != 0]

# 监控某些场景下,不传入 db_type 和 machine_type 的情况,例如 dbha 切换后,仅更新标签
if db_type is None and machine_type is None and bk_instance_ids is not None:
ins = (
Expand All @@ -567,8 +600,7 @@ def trigger_operate_collector(
machine_type = ins.machine_type
db_type = ClusterType.cluster_type_to_db_type(ins.cluster_type)

cache_key = format_operate_collector_cache_key(db_type, machine_type)
trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, "trigger_time")
cache_key, trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, action, "trigger_time")
trigger_time = RedisConn.get(trigger_time_key)

# 设置触发时间, 以 OPERATE_COLLECTOR_COUNTDOWN 作为一个滚动窗口来执行采集器操作
Expand Down

0 comments on commit 6da4e5f

Please sign in to comment.