Skip to content

Commit

Permalink
feat: 修复一次性任务统计&发日志时异常问题&统计相关的任务独立出单独的 worker 去处理
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuaikang committed Dec 13, 2023
1 parent d2459e3 commit 19d5b8c
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 24 deletions.
4 changes: 4 additions & 0 deletions app_desc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ modules:
command: python manage.py celery worker -Q default -n default@%h -P threads -c 10 -l info
plan: 4C2G5R
replicas: 5
analysis_worker:
command: python manage.py celery worker -Q analysis_statistics -n analysis_statistics@%h -P threads -c 10 -l info
plan: 4C2G5R
replicas: 5
svc_discovery:
bk_saas:
- bk_app_code: "bk_iam"
Expand Down
2 changes: 2 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ def _(s):
from pipeline.celery.settings import * # noqa
from pipeline.eri.celery import queues as eri_queues # noqa

from gcloud.analysis_statistics import settings as analysis_statistics_settings # noqa
from gcloud.contrib.cleaner import settings as cleaner_settings # noqa
from gcloud.taskflow3.celery import settings as taskflow3_celery_settings # noqa
from gcloud.taskflow3.domains.queues import PrepareAndStartTaskQueueResolver # noqa
Expand All @@ -504,6 +505,7 @@ def _(s):
CELERY_QUEUES.extend(PrepareAndStartTaskQueueResolver(API_TASK_QUEUE_NAME_V2).queues())
CELERY_QUEUES.extend(taskflow3_celery_settings.CELERY_QUEUES)
CELERY_QUEUES.extend(cleaner_settings.CELERY_QUEUES)
CELERY_QUEUES.extend(analysis_statistics_settings.CELERY_QUEUES)

CELERY_ROUTES.update({"gcloud.clocked_task.tasks.clocked_task_start": PIPELINE_ADDITIONAL_PRIORITY_ROUTING})

Expand Down
22 changes: 22 additions & 0 deletions gcloud/analysis_statistics/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from kombu import Exchange, Queue

CELERY_QUEUES = [
Queue(
"analysis_statistics",
Exchange("default", type="direct"),
routing_key="analysis_statistics",
queue_arguments={"x-max-priority": 255},
),
]
43 changes: 22 additions & 21 deletions gcloud/analysis_statistics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,37 @@
specific language governing permissions and limitations under the License.
"""


import logging
from copy import deepcopy
from datetime import datetime

import ujson as json
from bamboo_engine import api as bamboo_engine_api
from celery import task
from celery.task import periodic_task
from celery.schedules import crontab
from bamboo_engine import api as bamboo_engine_api

from celery.task import periodic_task
from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION
from pipeline.contrib.statistics.utils import count_pipeline_tree_nodes
from pipeline.core.constants import PE
from pipeline.models import PipelineTemplate
from pipeline.engine import api as pipeline_api
from pipeline.engine import states
from pipeline.engine.utils import calculate_elapsed_time
from pipeline.eri.runtime import BambooDjangoRuntime
from pipeline.models import PipelineTemplate

from gcloud.tasktmpl3.models import TaskTemplate
from gcloud.common_template.models import CommonTemplate
from gcloud.analysis_statistics import variable
from gcloud.analysis_statistics.models import (
TaskflowExecutedNodeStatistics,
TaskflowStatistics,
TemplateCustomVariableSummary,
TemplateNodeStatistics,
TemplateStatistics,
TaskflowExecutedNodeStatistics,
TemplateVariableStatistics,
TemplateCustomVariableSummary,
)
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.common_template.models import CommonTemplate
from gcloud.taskflow3.domains.dispatchers.task import TaskCommandDispatcher

from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.tasktmpl3.models import TaskTemplate

logger = logging.getLogger("celery")

Expand All @@ -67,10 +64,9 @@ def recursive_collect_components_execution(activities, status_tree, task_instanc
instance = task_instance.pipeline_instance
trigger_template_id = task_instance.template_id
task_instance_id = task_instance.id
task_template = (
TaskTemplate.objects.filter(pipeline_template=instance.template).first()
or CommonTemplate.objects.filter(pipeline_template=instance.template).first()
)
task_template = (TaskTemplate.objects.filter(pipeline_template=instance.template).first()
or CommonTemplate.objects.filter(pipeline_template=instance.template).first())

if not task_template:
raise Exception(f"task_template with template_id {instance.template.template_id} not found")
if stack is None:
Expand Down Expand Up @@ -165,15 +161,20 @@ def recursive_collect_components_execution(activities, status_tree, task_instanc
return component_list


@task
@task(queue="analysis_statistics")
def taskflowinstance_post_save_statistics_task(task_instance_id, created):
try:
taskflow_instance = TaskFlowInstance.objects.get(id=task_instance_id)
# pipeline数据
pipeline_instance = taskflow_instance.pipeline_instance
# template数据

# 忽略一次性任务的统计
if not taskflow_instance.template_id:
return

task_template = TaskTemplate.objects.get(id=taskflow_instance.template_id)
# 统计流程标准插件个数,子流程个数,网关个数
# 统计流程标准插件个数,子流程个数,网关个数
kwargs = {
"instance_id": pipeline_instance.id,
"project_id": taskflow_instance.project.id,
Expand Down Expand Up @@ -205,7 +206,7 @@ def taskflowinstance_post_save_statistics_task(task_instance_id, created):
return False


@task
@task(queue="analysis_statistics")
def tasktemplate_post_save_statistics_task(template_id):
template = TaskTemplate.objects.get(id=template_id)
task_template_id = template.id
Expand Down Expand Up @@ -307,7 +308,7 @@ def tasktemplate_post_save_statistics_task(template_id):
return True


@task
@task(queue="analysis_statistics")
def pipeline_archive_statistics_task(instance_id):
taskflow_instance = TaskFlowInstance.objects.get(pipeline_instance__instance_id=instance_id)
# 更新taskflowinstance统计数据start_time finish_time elapsed_time
Expand Down Expand Up @@ -349,8 +350,8 @@ def pipeline_archive_statistics_task(instance_id):
return True


@task
@periodic_task(run_every=crontab(hour="0"))
@task(queue="analysis_statistics")
@periodic_task(run_every=crontab(hour="0"), queue="analysis_statistics")
def backfill_template_variable_statistics_task():
custom_variables_records = {}

Expand Down
9 changes: 6 additions & 3 deletions gcloud/shortcuts/message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import logging

from gcloud.periodictask.models import PeriodicTask
from gcloud.shortcuts.message.common import (
title_and_content_for_atom_failed,
title_and_content_for_clocked_task_create_fail,
title_and_content_for_flow_finished,
title_and_content_for_periodic_task_start_fail,
title_and_content_for_clocked_task_create_fail,
)
from gcloud.shortcuts.message.send_msg import send_message
from gcloud.periodictask.models import PeriodicTask

logger = logging.getLogger("root")

Expand All @@ -30,6 +30,10 @@

def send_task_flow_message(taskflow, msg_type, node_name=""):

# 有可能是一次性执行任务,此时应当跳过发通知
if not taskflow.template:
return False

notify_types = taskflow.get_notify_type()
receivers_list = taskflow.get_stakeholders()
receivers = ",".join(receivers_list)
Expand Down Expand Up @@ -77,7 +81,6 @@ def send_periodic_task_message(periodic_task, history):


def send_clocked_task_message(clocked_task, ex_data):

notify_types = clocked_task.get_notify_type()
receivers_list = clocked_task.get_stakeholders()
receivers = ",".join(receivers_list)
Expand Down
9 changes: 9 additions & 0 deletions support-files/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,12 @@ redirect_stderr = true
stopwaitsecs = 10
autorestart = true
environment = {{.environment}}

[program: {{.app_code}}_analysis_worker]
command = /cache/.bk/env/bin/celery worker -A blueapps.core.celery -P threads -Q analysis_statistics -n {{.node_name}}_{{.app_code}}_analysis_worker -l INFO -c 300
directory = {{.app_container_path}}code/
stdout_logfile = {{.app_container_path}}logs/{{.app_code}}/celery.log
redirect_stderr = true
stopwaitsecs = 10
autorestart = true
environment = {{.environment}}

0 comments on commit 19d5b8c

Please sign in to comment.