Skip to content

Commit

Permalink
fix: 修复json序列化问题 --story=120883063
Browse files Browse the repository at this point in the history
  • Loading branch information
guohelu committed Nov 28, 2024
1 parent ca47db6 commit 7bad91b
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import datetime
import json
import logging

from bamboo_engine import states as bamboo_engine_states
Expand Down Expand Up @@ -46,25 +47,23 @@


def _finish_taskflow_and_send_signal(instance_id, sig, task_success=False):
qs = TaskFlowInstance.objects.filter(pipeline_instance__instance_id=instance_id).only("id")
if not qs:
task = TaskFlowInstance.objects.filter(pipeline_instance__instance_id=instance_id).first()
if not task:
logger.error("pipeline archive handler get taskflow error, pipeline_instance_id={}".format(instance_id))
return

task_id = qs[0].id

TaskFlowInstance.objects.filter(id=task_id).update(current_flow="finished")
sig.send(TaskFlowInstance, task_id=task_id)
TaskFlowInstance.objects.filter(id=task.id).update(current_flow="finished")
sig.send(TaskFlowInstance, task_id=task.id)

if task_success:
_check_and_callback(task_id, task_success=task_success, task=qs[0])
_check_and_callback(task, task_success=task_success)
try:
send_taskflow_message.delay(task_id=task_id, msg_type=TASK_FINISHED)
send_taskflow_message.delay(task_id=task.id, msg_type=TASK_FINISHED)
except Exception as e:
logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task_id, e))
logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task.id, e))

if sig is taskflow_revoked:
_check_and_callback(task_id, task_success=False, task=qs[0])
_check_and_callback(task, task_success=False)


def _send_node_fail_message(node_id, pipeline_id):
Expand All @@ -73,7 +72,7 @@ def _send_node_fail_message(node_id, pipeline_id):
except TaskFlowInstance.DoesNotExist:
logger.error("pipeline finished handler get taskflow error, pipeline_instance_id=%s" % pipeline_id)
return
_check_and_callback(taskflow.id, task_success=False, task=taskflow)
_check_and_callback(taskflow, task_success=False)

if taskflow.is_child_taskflow is False:
try:
Expand All @@ -84,15 +83,24 @@ def _send_node_fail_message(node_id, pipeline_id):
logger.exception("pipeline_fail_handler[taskflow_id=%s] task delay error: %s" % (taskflow.id, e))


def _check_and_callback(taskflow_id, *args, **kwargs):
if not TaskCallBackRecord.objects.filter(task_id=taskflow_id).exists():
def _check_and_callback(task, *args, **kwargs):
record = TaskCallBackRecord.objects.filter(task_id=task.id).first()
if not record:
return
try:
if kwargs.get("task"):
task = kwargs.pop("task")
kwargs["task_outputs"] = task.get_task_detail()["outputs"]
if (
record.url
and json.loads(record.extra_info).get("callback_version") == TaskCallBackRecord.CALLBACK_VERSION_V2
):
# 检查任务的输出是否可以被json序列化,如果可以则将输出作为参数传给回调函数,否则不做处理
try:
task_outputs = task.get_task_detail()["outputs"]
json.dumps(task_outputs)
kwargs["task_outputs"] = task_outputs
except Exception as e:
logger.exception(f"[task {task.id}] outputs data serialize error: {e}")
task_callback.apply_async(
kwargs=dict(task_id=taskflow_id, **kwargs),
kwargs=dict(task_id=task.id, **kwargs),
queue="task_callback",
routing_key="task_callback",
)
Expand Down

0 comments on commit 7bad91b

Please sign in to comment.