diff --git a/apps/backend/periodic_tasks/update_subscription_instances.py b/apps/backend/periodic_tasks/update_subscription_instances.py index ce2cfec3c..f1302ec96 100644 --- a/apps/backend/periodic_tasks/update_subscription_instances.py +++ b/apps/backend/periodic_tasks/update_subscription_instances.py @@ -2,9 +2,14 @@ import logging from celery.task import periodic_task +from django.db.models import Value from apps.backend.subscription.constants import SUBSCRIPTION_UPDATE_INTERVAL from apps.backend.subscription.tasks import update_subscription_instances_chunk +from apps.backend.subscription.tools import ( + by_biz_dispatch_task_queue, + get_biz_ids_gby_queue, +) from apps.node_man import models from apps.utils.periodic_task import calculate_countdown @@ -24,12 +29,19 @@ def update_subscription_instances(): # 关闭订阅自动巡检 return - subscription_ids = list( - models.Subscription.objects.filter(enable=True, is_deleted=False).values_list("id", flat=True) + subscriptions = models.Subscription.objects.filter(enable=Value(1), is_deleted=Value(0)).values( + "id", "bk_biz_id", "bk_biz_scope" ) + subscription_ids = [subscription["id"] for subscription in subscriptions] + subscription_id__biz_ids_map = { + subscription["id"]: subscription["bk_biz_scope"] + [subscription["bk_biz_id"]] for subscription in subscriptions + } + biz_ids_gby_queue = get_biz_ids_gby_queue() + count = len(subscription_ids) for index, subscription_id in enumerate(subscription_ids): # 把订阅平均分布到10分钟内执行,用于削峰 countdown = calculate_countdown(count=count, index=index, duration=SUBSCRIPTION_UPDATE_INTERVAL) - logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds.") - update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown) + task_queue = by_biz_dispatch_task_queue(biz_ids_gby_queue, subscription_id__biz_ids_map[subscription_id]) + logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds in queue ({task_queue}).") + update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown, queue=task_queue) diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index 07f6e2f63..6675a5ffb 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -1512,3 +1512,20 @@ def check_subscription_is_disabled( logger.info(f"[check_subscription_is_disabled] {subscription_identity}: not in the disable list, skipping") return False + + +def get_biz_ids_gby_queue(): + biz_ids_gby_queue: Dict[str, List[int]] = models.GlobalSettings.get_config( + key=models.GlobalSettings.KeyEnum.SUBSCRIPTION_UPDATE_TASK_QUEUE.value, default={} + ) + return biz_ids_gby_queue + + +def by_biz_dispatch_task_queue(biz_ids_gby_queue, bk_biz_ids): + """通过业务ID分配任务队列""" + default_task_queue = "backend_additional_task" + for task_queue, partial_biz_ids in biz_ids_gby_queue.items(): + if set(partial_biz_ids) & set(bk_biz_ids): + return task_queue + + return default_task_queue diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 2f4c701a2..f579639a2 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -154,6 +154,8 @@ class KeyEnum(Enum): ENABLE_NOTICE_CENTER = "ENABLE_NOTICE_CENTER" # 禁用已停用插件 DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN" + # 根据订阅分配任务队列 + SUBSCRIPTION_UPDATE_TASK_QUEUE = "SUBSCRIPTION_UPDATE_TASK_QUEUE" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值")) diff --git a/apps/node_man/periodic_tasks/resource_watch_task.py b/apps/node_man/periodic_tasks/resource_watch_task.py index 4d61b09b3..ef92e6c6f 100644 --- a/apps/node_man/periodic_tasks/resource_watch_task.py +++ b/apps/node_man/periodic_tasks/resource_watch_task.py @@ -19,6 +19,10 @@ from django.db.models import Q from django.db.utils import IntegrityError +from apps.backend.subscription.tools import ( + by_biz_dispatch_task_queue, + get_biz_ids_gby_queue, +) from apps.component.esbclient import client_v2 from apps.node_man import constants from apps.node_man.models import GlobalSettings, Host, ResourceWatchEvent, Subscription @@ -427,11 +431,15 @@ def trigger_nodeman_subscription(bk_biz_id, debounce_time=0): method="subscription", bk_biz_id=bk_biz_id, debounce_time=debounce_time ).inc() + biz_ids_gby_queue = get_biz_ids_gby_queue() + task_queue: str = by_biz_dispatch_task_queue(biz_ids_gby_queue, [bk_biz_id]) + update_subscription_instances_chunk.apply_async( - kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time + kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time, queue=task_queue ) logger.info( f"[trigger_nodeman_subscription] following subscriptions " f"will be run -> ({subscription_ids}) after {debounce_time} s" + f" in queue -> ({task_queue})" )