Skip to content

Commit

Permalink
feat: 订阅巡检支持根据业务分发任务到不同队列 (closed TencentBlueKing#2061)
Browse files Browse the repository at this point in the history
# Reviewed, transaction id: 5009
  • Loading branch information
Huayeaaa committed Apr 1, 2024
1 parent d3bda17 commit 72efd08
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 3 deletions.
6 changes: 4 additions & 2 deletions apps/backend/periodic_tasks/update_subscription_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from celery.task import periodic_task

from apps.backend.periodic_tasks.utils import by_subs_id_dispatch_task_queue
from apps.backend.subscription.constants import SUBSCRIPTION_UPDATE_INTERVAL
from apps.backend.subscription.tasks import update_subscription_instances_chunk
from apps.node_man import models
Expand Down Expand Up @@ -31,5 +32,6 @@ def update_subscription_instances():
for index, subscription_id in enumerate(subscription_ids):
# 把订阅平均分布到10分钟内执行,用于削峰
countdown = calculate_countdown(count=count, index=index, duration=SUBSCRIPTION_UPDATE_INTERVAL)
logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds.")
update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown)
task_queue = by_subs_id_dispatch_task_queue(subscription_id)
logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds in queue ({task_queue}).")
update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown, queue=task_queue)
23 changes: 23 additions & 0 deletions apps/backend/periodic_tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,26 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from typing import Dict, List

from apps.node_man import models


def by_subs_id_dispatch_task_queue(subscription_id: int):
"""通过订阅ID分配任务队列"""
queue_biz_ids: Dict[str, List[int]] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.QUEUE_BIZ_IDS_MAP.value, default={}
)
biz_ids: List[int] = [biz_id for sublist in queue_biz_ids.values() for biz_id in sublist]
biz_id__queue_map: Dict[int, str] = {biz_id: queue for queue, values in queue_biz_ids.items() for biz_id in values}
default_task_queue = "backend_additional_task"
subscription = models.Subscription.get_subscription(subscription_id=subscription_id)
if any([subscription.bk_biz_id in biz_ids, set(subscription.bk_biz_scope or []) & set(biz_ids)]):

return next(
biz_id__queue_map.get(biz_id)
for biz_id in [subscription.bk_biz_id] + subscription.bk_biz_scope
if biz_id in biz_id__queue_map
)

return default_task_queue
2 changes: 2 additions & 0 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class KeyEnum(Enum):
ENABLE_NOTICE_CENTER = "ENABLE_NOTICE_CENTER"
# 禁用已停用插件
DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN"
# 任务队列与业务IDs映射
QUEUE_BIZ_IDS_MAP = "QUEUE_BIZ_IDS_MAP"

key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True)
v_json = JSONField(_("值"))
Expand Down
10 changes: 9 additions & 1 deletion apps/node_man/periodic_tasks/resource_watch_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,19 @@ def trigger_nodeman_subscription(bk_biz_id, debounce_time=0):
method="subscription", bk_biz_id=bk_biz_id, debounce_time=debounce_time
).inc()

queue_biz_ids: typing.Dict[str, typing.List[int]] = GlobalSettings.get_config(
key=GlobalSettings.KeyEnum.QUEUE_BIZ_IDS_MAP.value, default={}
)
biz_id__queue_map: typing.Dict[int, str] = {
biz_id: queue for queue, values in queue_biz_ids.items() for biz_id in values
}
task_queue: str = biz_id__queue_map.get(bk_biz_id, "backend_additional_task")
update_subscription_instances_chunk.apply_async(
kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time
kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time, queue=task_queue
)

logger.info(
f"[trigger_nodeman_subscription] following subscriptions "
f"will be run -> ({subscription_ids}) after {debounce_time} s"
f" in queue -> ({task_queue})"
)

0 comments on commit 72efd08

Please sign in to comment.