diff --git a/dbm-ui/backend/db_meta/signals.py b/dbm-ui/backend/db_meta/signals.py index 8fc3305933..e7fe4919d6 100644 --- a/dbm-ui/backend/db_meta/signals.py +++ b/dbm-ui/backend/db_meta/signals.py @@ -13,8 +13,10 @@ from django.db.models.signals import pre_delete -from backend.db_meta.enums import ClusterStatus +from backend.db_meta.enums import ClusterStatus, ClusterType from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance +from backend.flow.consts import OperateCollectorActionEnum +from backend.flow.utils.cc_manage import trigger_operate_collector logger = logging.getLogger("root") @@ -29,6 +31,12 @@ def update_cluster_status(sender, instance: Union[StorageInstance, ProxyInstance if kwargs.get("signal") == pre_delete and not isinstance(instance, Cluster): # 提前删除实例与cluster的关联关系 cluster = instance.cluster.first() + trigger_operate_collector( + ClusterType.cluster_type_to_db_type(cluster.cluster_type), + instance.machine_type, + bk_instance_ids=[instance.bk_instance_id], + action=OperateCollectorActionEnum.INSTALL.value, + ) if cluster and sender == StorageInstance: cluster.storageinstance_set.remove(instance) elif cluster and sender == ProxyInstance: diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index a5c984f802..d07b40b755 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -193,6 +193,11 @@ class StateType(str, StructuredEnum): DEFAULT_INSTANCE = {"ip": "0.0.0.0", "port": 0, "bk_cloud_id": 0} +class OperateCollectorActionEnum(str, StructuredEnum): + INSTALL = EnumField("install", _("安装")) + UNINSTALL = EnumField("UNINSTALL", _("卸载")) + + class NameSpaceEnum(str, StructuredEnum): Common = EnumField("common", _("共用参数")) RedisCommon = EnumField("rediscomm", _("redis共用参数")) diff --git a/dbm-ui/backend/flow/plugins/components/collections/common/transfer_host_service.py b/dbm-ui/backend/flow/plugins/components/collections/common/transfer_host_service.py index 345ff4f1bb..eaf4472531 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/common/transfer_host_service.py +++ b/dbm-ui/backend/flow/plugins/components/collections/common/transfer_host_service.py @@ -29,10 +29,14 @@ def _execute(self, data, parent_data): bk_module_ids = kwargs["bk_module_ids"] bk_host_ids = kwargs["bk_host_ids"] update_host_properties = kwargs.get("update_host_properties", None) + operate_collector_action = kwargs.get("operate_collector_action", None) try: cc_manage = CcManage(bk_biz_id=bk_biz_id, cluster_type="") cc_manage.transfer_host_module( - bk_host_ids=bk_host_ids, target_module_ids=bk_module_ids, update_host_properties=update_host_properties + bk_host_ids=bk_host_ids, + target_module_ids=bk_module_ids, + update_host_properties=update_host_properties, + operate_collector_action=operate_collector_action, ) return True except Exception as err: # pylint: disable=broad-except diff --git a/dbm-ui/backend/flow/utils/cc_manage.py b/dbm-ui/backend/flow/utils/cc_manage.py index 76c3bfb40f..ab4fe4df2a 100644 --- a/dbm-ui/backend/flow/utils/cc_manage.py +++ b/dbm-ui/backend/flow/utils/cc_manage.py @@ -31,6 +31,7 @@ from backend.dbm_init.constants import CC_HOST_DBM_ATTR from backend.dbm_init.services import Services from backend.exceptions import ApiError +from backend.flow.consts import OperateCollectorActionEnum from backend.utils.redis import RedisConn logger = logging.getLogger("flow") @@ -168,6 +169,23 @@ def batch_update_host(host_info_list: List[Dict[str, Any]], need_monitor: bool): return updated_hosts, failed_updates + @staticmethod + def operate_host_collectors(bk_host_ids: List[int], action: str): + """ + 操作主机采集器,前提是这些主机已经存在服务实例了 + """ + # 查询主机的服务实例 + bk_instance_ids = [] + bk_instance_ids.extend( + StorageInstance.objects.filter(machine__bk_host_id__in=bk_host_ids).values("bk_instance_id", flat=True) + ) + bk_instance_ids.extend( + ProxyInstance.objects.filter(machine__bk_host_id__in=bk_host_ids).values("bk_instance_id", flat=True) + ) + + # 操作采集器 + trigger_operate_collector(bk_instance_ids=bk_instance_ids, action=action) + def update_host_properties( self, bk_host_ids: List[int], need_monitor: bool = True, dbm_meta=None, update_operator: bool = True ): @@ -221,6 +239,7 @@ def transfer_host_to_idlemodule( self, bk_biz_id: int, bk_host_ids: List[int], biz_idle_module: int = None, host_topo: List[Dict] = None ): """将主机转移到当前业务的空闲模块""" + self.operate_host_collectors(bk_host_ids, OperateCollectorActionEnum.UNINSTALL.value) # 获取业务的空闲模块和主机拓扑信息 biz_idle_module = biz_idle_module or self.get_biz_internal_module(bk_biz_id)[IDLE_HOST_MODULE]["bk_module_id"] @@ -250,12 +269,14 @@ def transfer_host_module( target_module_ids: list, is_increment: bool = False, update_host_properties: dict = None, + operate_collector_action: str = None, ): """ @param bk_host_ids 主机id列表 @param target_module_ids 目标模块id列表 @param is_increment 是否增量转移,即主机处于多模块 @param update_host_properties 主机属性更新选项 + @param operate_collector_action 是否操作主机采集器 跨业务转移主机,需要先做中转处理 循环判断处理,逻辑保证幂等操作 考虑这几种情况: @@ -269,6 +290,11 @@ def transfer_host_module( # 有些角色允许为空,所以要忽略 return + if operate_collector_action is not None: + # 主机通常会在转移模块后再添加服务实例,添加服务实例后再操作采集器 + # 这里转移主机模块,大概率是给转移空闲机,进行采集器卸载操作 + self.operate_host_collectors(bk_host_ids, operate_collector_action) + bk_host_ids = list(set(bk_host_ids)) # 查询当前bk_hosts_ids的业务对应关系 logger.info(f"transfer_host_module, bk_host_ids:{bk_host_ids}") @@ -331,6 +357,9 @@ def recycle_host(self, bk_host_ids: list): 转移主机后会自动删除服务实例,无需额外操作 """ bk_host_ids = list(set(bk_host_ids)) + self.operate_host_collectors(bk_host_ids, OperateCollectorActionEnum.UNINSTALL.value) + + # 转移机器到待回收 CCApi.transfer_host_to_recyclemodule( params={"bk_biz_id": self.hosting_biz_id, "bk_host_id": bk_host_ids}, use_admin=True ) @@ -496,25 +525,23 @@ def delete_instance_modules(self, db_type: str, ins: StorageInstance, cluster_ty self.delete_cc_module(db_type, cluster_type, instance_id=ins.id) -def format_operate_collector_cache_key(db_type, machine_type, suffix="") -> str: +def format_operate_collector_cache_key(db_type, machine_type, action, suffix) -> tuple: """ 生成操作采集器缓存key """ - cache_key = f"operate_collector_{db_type}_{machine_type}" - if suffix: - cache_key = f"{cache_key}_{suffix}" - return cache_key + cache_key = f"operate_collector_{db_type}_{machine_type}_{action}" + trigger_time_key = f"{cache_key}_{suffix}" + return cache_key, trigger_time_key @app.task -def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, action="install"): +def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, action: str): """ 操作采集器 调用监控 API,针对本次变更的范围进行下发 """ - cache_key = format_operate_collector_cache_key(db_type, machine_type) - trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, "trigger_time") + cache_key, trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, action, "trigger_time") trigger_time = RedisConn.get(trigger_time_key) # 通过触发时间,加上延迟窗口,来实现滚动窗口,避免串行调用节点管理 @@ -551,11 +578,17 @@ def operate_collector(db_type: str, machine_type: str, bk_instance_ids: list, ac def trigger_operate_collector( - db_type: str = None, machine_type: str = None, bk_instance_ids: list = None, action="install" + db_type: str = None, + machine_type: str = None, + bk_instance_ids: list = None, + action=OperateCollectorActionEnum.INSTALL.value, ): """ 触发操作采集器 """ + # 排除掉 bk_instance_ids 中包含 0 的值,可能是脏数据 + bk_instance_ids = [bk_instance_id for bk_instance_id in bk_instance_ids if bk_instance_id != 0] + # 监控某些场景下,不传入 db_type 和 machine_type 的情况,例如 dbha 切换后,仅更新标签 if db_type is None and machine_type is None and bk_instance_ids is not None: ins = ( @@ -567,8 +600,7 @@ def trigger_operate_collector( machine_type = ins.machine_type db_type = ClusterType.cluster_type_to_db_type(ins.cluster_type) - cache_key = format_operate_collector_cache_key(db_type, machine_type) - trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, "trigger_time") + cache_key, trigger_time_key = format_operate_collector_cache_key(db_type, machine_type, action, "trigger_time") trigger_time = RedisConn.get(trigger_time_key) # 设置触发时间, 以 OPERATE_COLLECTOR_COUNTDOWN 作为一个滚动窗口来执行采集器操作