diff --git a/apps/backend/periodic_tasks/update_subscription_instances.py b/apps/backend/periodic_tasks/update_subscription_instances.py index ce2cfec3c..332d7603a 100644 --- a/apps/backend/periodic_tasks/update_subscription_instances.py +++ b/apps/backend/periodic_tasks/update_subscription_instances.py @@ -3,6 +3,7 @@ from celery.task import periodic_task +from apps.backend.periodic_tasks.utils import by_subs_id_dispatch_task_queue from apps.backend.subscription.constants import SUBSCRIPTION_UPDATE_INTERVAL from apps.backend.subscription.tasks import update_subscription_instances_chunk from apps.node_man import models @@ -31,5 +32,6 @@ def update_subscription_instances(): for index, subscription_id in enumerate(subscription_ids): # 把订阅平均分布到10分钟内执行,用于削峰 countdown = calculate_countdown(count=count, index=index, duration=SUBSCRIPTION_UPDATE_INTERVAL) - logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds.") - update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown) + task_queue = by_subs_id_dispatch_task_queue(subscription_id) + logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds in queue ({task_queue}).") + update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown, queue=task_queue) diff --git a/apps/backend/periodic_tasks/utils.py b/apps/backend/periodic_tasks/utils.py index 29ed269e0..c4126869e 100644 --- a/apps/backend/periodic_tasks/utils.py +++ b/apps/backend/periodic_tasks/utils.py @@ -8,3 +8,26 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from typing import Dict, List + +from apps.node_man import models + + +def by_subs_id_dispatch_task_queue(subscription_id: int): + """通过订阅ID分配任务队列""" + queue_biz_ids: Dict[str, List[int]] = models.GlobalSettings.get_config( + key=models.GlobalSettings.KeyEnum.QUEUE_BIZ_IDS_MAP.value, default={} + ) + biz_ids: List[int] = [biz_id for sublist in queue_biz_ids.values() for biz_id in sublist] + biz_id__queue_map: Dict[int, str] = {biz_id: queue for queue, values in queue_biz_ids.items() for biz_id in values} + default_task_queue = "backend_additional_task" + subscription = models.Subscription.get_subscription(subscription_id=subscription_id) + if any([subscription.bk_biz_id in biz_ids, set(subscription.bk_biz_scope or []) & set(biz_ids)]): + + return next( + biz_id__queue_map.get(biz_id) + for biz_id in [subscription.bk_biz_id] + subscription.bk_biz_scope + if biz_id in biz_id__queue_map + ) + + return default_task_queue diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 2f4c701a2..584d56a8a 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -154,6 +154,8 @@ class KeyEnum(Enum): ENABLE_NOTICE_CENTER = "ENABLE_NOTICE_CENTER" # 禁用已停用插件 DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN" + # 任务队列与业务IDs映射 + QUEUE_BIZ_IDS_MAP = "QUEUE_BIZ_IDS_MAP" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值")) diff --git a/apps/node_man/periodic_tasks/resource_watch_task.py b/apps/node_man/periodic_tasks/resource_watch_task.py index 4d61b09b3..1d19cf2d4 100644 --- a/apps/node_man/periodic_tasks/resource_watch_task.py +++ b/apps/node_man/periodic_tasks/resource_watch_task.py @@ -427,11 +427,19 @@ def trigger_nodeman_subscription(bk_biz_id, debounce_time=0): method="subscription", bk_biz_id=bk_biz_id, debounce_time=debounce_time ).inc() + queue_biz_ids: typing.Dict[str, typing.List[int]] = GlobalSettings.get_config( + key=GlobalSettings.KeyEnum.QUEUE_BIZ_IDS_MAP.value, default={} + ) + biz_id__queue_map: typing.Dict[int, str] = { + biz_id: queue for queue, values in queue_biz_ids.items() for biz_id in values + } + task_queue: str = biz_id__queue_map.get(bk_biz_id, "backend_additional_task") update_subscription_instances_chunk.apply_async( - kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time + kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time, queue=task_queue ) logger.info( f"[trigger_nodeman_subscription] following subscriptions " f"will be run -> ({subscription_ids}) after {debounce_time} s" + f" in queue -> ({task_queue})" )