diff --git a/apps/backend/constants.py b/apps/backend/constants.py index ac1d01da7..c33c7cf49 100644 --- a/apps/backend/constants.py +++ b/apps/backend/constants.py @@ -181,8 +181,10 @@ def needs_batch_request(self) -> bool: # 处理卸载残留订阅任务间隔 HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL = 6 * 60 * 60 -# 最大订阅任务数量 -MAX_SUBSCRIPTION_TASK_COUNT = 50 +# 最大更新订阅任务储存数量 +MAX_STORE_SUBSCRIPTION_TASK_COUNT = 200 +# 最大执行订阅任务数量 +MAX_RUN_SUBSCRIPTION_TASK_COUNT = 50 # 订阅删除时间小时数 SUBSCRIPTION_DELETE_HOURS = 6 diff --git a/apps/backend/periodic_tasks/__init__.py b/apps/backend/periodic_tasks/__init__.py index 9a42f3b98..4479cc195 100644 --- a/apps/backend/periodic_tasks/__init__.py +++ b/apps/backend/periodic_tasks/__init__.py @@ -16,4 +16,5 @@ from .clean_sub_data import clean_sub_data_task # noqa from .clean_subscription_data import clean_subscription_data # noqa from .collect_auto_trigger_job import collect_auto_trigger_job # noqa +from .schedule_running_subscription_task import * # noqa from .update_subscription_instances import update_subscription_instances # noqa diff --git a/apps/backend/periodic_tasks/check_zombie_sub_inst_record.py b/apps/backend/periodic_tasks/check_zombie_sub_inst_record.py index a330aea4d..8fe643186 100644 --- a/apps/backend/periodic_tasks/check_zombie_sub_inst_record.py +++ b/apps/backend/periodic_tasks/check_zombie_sub_inst_record.py @@ -18,7 +18,10 @@ from django.utils import timezone from django.utils.translation import ugettext_lazy as _ -from apps.backend.subscription.constants import CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL +from apps.backend.subscription.constants import ( + CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL, + ZOMBIE_SUB_INST_RECORD_COUNT, +) from apps.node_man import constants, models from apps.utils.time_handler import strftime_local @@ -48,10 +51,20 @@ def check_zombie_sub_inst_record(): "status__in": [constants.JobStatusType.PENDING, constants.JobStatusType.RUNNING], } base_update_kwargs = {"status": constants.JobStatusType.FAILED, "update_time": timezone.now()} - - forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update( - **base_update_kwargs - ) + # 先count确认是否需要update,如果count数量小于100传主键 update,否则继续沿用现在的方式 + subscription_instance_record_qs = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs) + if not subscription_instance_record_qs.exists(): + logger.info("no zombie_sub_inst_record skipped") + return + if subscription_instance_record_qs.count() < ZOMBIE_SUB_INST_RECORD_COUNT: + forced_failed_inst_record_ids = set(subscription_instance_record_qs.values_list("id", flat=True)) + forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter( + id__in=forced_failed_inst_record_ids + ).update(**base_update_kwargs) + else: + forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update( + **base_update_kwargs + ) forced_failed_status_detail_num = models.SubscriptionInstanceStatusDetail.objects.filter(**query_kwargs).update( **base_update_kwargs, diff --git a/apps/backend/periodic_tasks/schedule_running_subscription_task.py b/apps/backend/periodic_tasks/schedule_running_subscription_task.py index 949be102c..b7da48c1b 100644 --- a/apps/backend/periodic_tasks/schedule_running_subscription_task.py +++ b/apps/backend/periodic_tasks/schedule_running_subscription_task.py @@ -37,18 +37,14 @@ def get_need_clean_subscription_app_code(): @periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"}) def schedule_update_subscription(): name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL - # 先计算出要从redis取数据的长度 - length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) - # 从redis中取出对应长度的数据 - update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1) - # 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失 - REDIS_INST.ltrim(name, 0, -length - 1) - # 翻转数据,先进的数据先处理 - update_params.reverse() + # 取出该hashset中所有的参数 + update_params: Dict[str, bytes] = REDIS_INST.hgetall(name=name) + # 删除该hashset内的所有参数 + REDIS_INST.delete(name) results = [] if not update_params: return - for update_param in update_params: + for update_param in update_params.values(): # redis取出为bytes类型,需进行解码后转字典 params = json.loads(update_param.decode()) subscription_id = params["subscription_id"] @@ -64,7 +60,7 @@ def schedule_update_subscription(): @periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"}) def schedule_run_subscription(): name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL - length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT) run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1) REDIS_INST.ltrim(name, 0, -length - 1) run_params.reverse() diff --git a/apps/backend/subscription/constants.py b/apps/backend/subscription/constants.py index 33e7118cf..e9d273097 100644 --- a/apps/backend/subscription/constants.py +++ b/apps/backend/subscription/constants.py @@ -20,6 +20,8 @@ # 检查僵尸订阅实例记录周期 CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL = 15 * constants.TimeUnit.MINUTE +# 僵尸订阅实例记录数量 +ZOMBIE_SUB_INST_RECORD_COUNT = 100 # 任务超时时间。距离 create_time 多久后会被判定为超时,防止 pipeline 后台僵死的情况 TASK_TIMEOUT = 15 * constants.TimeUnit.MINUTE diff --git a/apps/backend/subscription/handler.py b/apps/backend/subscription/handler.py index 80a49bf57..021d1ddfb 100644 --- a/apps/backend/subscription/handler.py +++ b/apps/backend/subscription/handler.py @@ -443,8 +443,16 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i raise errors.SubscriptionIncludeGrayBizError() if subscription.is_running(): + # 这里仍使用lpush的原因在于订阅任务可能执行的动作不一样,不能使用更新 + name = backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL + if REDIS_INST.llen(name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT: + logger.info("redis list store params is full") + return { + "subscription_id": subscription.id, + "message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"), + } params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions}) - REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params) + REDIS_INST.lpush(name, params) logger.info(f"run subscription[{subscription.id}] store params into redis: {params}") return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")} @@ -704,8 +712,15 @@ def update_subscription(params: Dict[str, Any]): ): raise errors.SubscriptionIncludeGrayBizError() if subscription.is_running(): - REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params)) - logger.info(f"update subscription[{subscription.id}] store params into redis: {params}") + name = backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL + if REDIS_INST.hlen(name=name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT: + logger.info("redis hashset store params is full") + return { + "subscription_id": subscription.id, + "message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"), + } + REDIS_INST.hset(name, key=f"subscription_id_{subscription.id}", value=json.dumps(params)) + logger.info(f"update subscription[{subscription.id}] store or update params into redis: {params}") return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")} with transaction.atomic(): diff --git a/apps/backend/subscription/views.py b/apps/backend/subscription/views.py index 8ca57e5a5..a2e0ff811 100644 --- a/apps/backend/subscription/views.py +++ b/apps/backend/subscription/views.py @@ -213,7 +213,7 @@ def delete_subscription(self, request): raise errors.SubscriptionNotExist({"subscription_id": subscription_id}) # 调用delete()方法才会记录删除时间 subscription_qs.delete() - logger.info(f"deleted subscription: {subscription_id}") + logger.info(f"deleted_subscription_id: {subscription_id}") return Response({"deleted_subscription_id": subscription_id}) @swagger_auto_schema( diff --git a/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py b/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py index bfcf9ee86..4d1329f91 100644 --- a/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py +++ b/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py @@ -53,7 +53,7 @@ def setUp(self) -> None: def test_schedule_running_subscription_task(self): name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL - length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT) run_params = REDIS_INST.lrange(name, -length, -1) self.assertEqual( json.loads(run_params[0].decode()), @@ -102,9 +102,10 @@ def setUp(self) -> None: def test_schedule_update_subscription_task(self): name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL - length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) - run_params = REDIS_INST.lrange(name, -length, -1) - self.assertEqual(json.loads(run_params[0].decode()), self.params) + + update_params = REDIS_INST.hgetall(name=name) + for update_param in update_params.values(): + self.assertEqual(json.loads(update_param.decode()), self.params) models.SubscriptionInstanceRecord.objects.filter( id=self.ids["subscription_instance_record_id"], subscription_id=self.ids["subscription_id"] ).update(status="SUCCESS") diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 8dc074de9..c1a2b7029 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -21,6 +21,7 @@ import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta from distutils.dir_util import copy_tree from enum import Enum from functools import cmp_to_key, reduce @@ -1928,7 +1929,12 @@ def get_subscription(cls, subscription_id: int, show_deleted=False): def is_running(self, instance_id_list: List[str] = None): """订阅下是否有运行中的任务""" - base_kwargs = {"subscription_id": self.id, "is_latest": True} + # 只需检查近两小时内的订阅实例 + base_kwargs = { + "subscription_id": self.id, + "is_latest": True, + "update_time__gte": timezone.now() - timedelta(hours=2), + } if instance_id_list is not None: base_kwargs["instance_id__in"] = instance_id_list status_set = set(SubscriptionInstanceRecord.objects.filter(**base_kwargs).values_list("status", flat=True))