Skip to content

Commit

Permalink
perf: 任务回调添加任务输出数据 #7554
Browse files Browse the repository at this point in the history
  • Loading branch information
lTimej authored and normal-wls committed Nov 20, 2024
1 parent 09eb0f1 commit aabe47d
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,26 @@
"""

import datetime
import json
import logging

from bamboo_engine import states as bamboo_engine_states
from bk_monitor_report.reporter import MonitorReporter
from django.conf import settings
from django.dispatch import receiver
from pipeline.core.pipeline import Pipeline
from pipeline.engine.signals import activity_failed, pipeline_end, pipeline_revoke
from pipeline.eri.signals import (
execute_interrupt,
post_set_state,
pre_service_execute,
pre_service_schedule,
schedule_interrupt,
)
from pipeline.models import PipelineInstance
from pipeline.signals import post_pipeline_finish, post_pipeline_revoke

import env
from bamboo_engine import states as bamboo_engine_states
from gcloud.shortcuts.message import ATOM_FAILED, TASK_FINISHED
from gcloud.taskflow3.celery.tasks import auto_retry_node, send_taskflow_message, task_callback
from gcloud.taskflow3.models import (
Expand All @@ -30,17 +42,6 @@
TimeoutNodeConfig,
)
from gcloud.taskflow3.signals import taskflow_finished, taskflow_revoked
from pipeline.core.pipeline import Pipeline
from pipeline.engine.signals import activity_failed, pipeline_end, pipeline_revoke
from pipeline.eri.signals import (
execute_interrupt,
post_set_state,
pre_service_execute,
pre_service_schedule,
schedule_interrupt,
)
from pipeline.models import PipelineInstance
from pipeline.signals import post_pipeline_finish, post_pipeline_revoke

logger = logging.getLogger("celery")

Expand All @@ -57,14 +58,14 @@ def _finish_taskflow_and_send_signal(instance_id, sig, task_success=False):
sig.send(TaskFlowInstance, task_id=task_id)

if task_success:
_check_and_callback(task_id, task_success=task_success)
_check_and_callback(task_id, task_success=task_success, task=qs[0])
try:
send_taskflow_message.delay(task_id=task_id, msg_type=TASK_FINISHED)
except Exception as e:
logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task_id, e))

if sig is taskflow_revoked:
_check_and_callback(task_id, task_success=False)
_check_and_callback(task_id, task_success=False, task=qs[0])


def _send_node_fail_message(node_id, pipeline_id):
Expand All @@ -73,8 +74,7 @@ def _send_node_fail_message(node_id, pipeline_id):
except TaskFlowInstance.DoesNotExist:
logger.error("pipeline finished handler get taskflow error, pipeline_instance_id=%s" % pipeline_id)
return

_check_and_callback(taskflow.id, task_success=False)
_check_and_callback(taskflow.id, task_success=False, task=taskflow)

if taskflow.is_child_taskflow is False:
try:
Expand All @@ -89,8 +89,13 @@ def _check_and_callback(taskflow_id, *args, **kwargs):
if not TaskCallBackRecord.objects.filter(task_id=taskflow_id).exists():
return
try:
if kwargs.get("task"):
task = kwargs.pop("task")
kwargs["task_outputs"] = json.dumps(task.get_task_detail()["outputs"])
task_callback.apply_async(
kwargs=dict(task_id=taskflow_id, **kwargs), queue="task_callback", routing_key="task_callback",
kwargs=dict(task_id=taskflow_id, **kwargs),
queue="task_callback",
routing_key="task_callback",
)
except Exception as e:
logger.exception(f"[_check_and_callback] task_callback delay error: {e}")
Expand Down

0 comments on commit aabe47d

Please sign in to comment.