Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

节点详情及任务状态 API 增加自动重试配置信息 #7126

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gcloud/apigw/views/get_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
specific language governing permissions and limitations under the License.
"""
from apigw_manager.apigw.decorators import apigw_require
from bamboo_engine import states as bamboo_engine_states
from blueapps.account.decorators import login_exempt
from cachetools import TTLCache
from django.views.decorators.http import require_GET
Expand All @@ -23,7 +24,7 @@
from gcloud.iam_auth.view_interceptors.apigw import TaskViewInterceptor
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_failed_nodes, get_failed_nodes_info
from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_nodes_by_statuses, get_failed_nodes_info


def cache_decisioner(key, value):
Expand Down Expand Up @@ -91,7 +92,7 @@ def get_task_status(request, task_id, project_id):
if with_failed_node_info or with_auto_retry_status:
try:
status_tree, root_pipeline_id = result["data"], result["data"]["id"]
failed_node_ids = extract_failed_nodes(status_tree)
failed_node_ids = extract_nodes_by_statuses(status_tree, statuses=[bamboo_engine_states.FAILED])
failed_node_info = get_failed_nodes_info(root_pipeline_id, failed_node_ids)
if with_failed_node_info:
result["data"]["failed_node_info"] = failed_node_info
Expand Down
16 changes: 16 additions & 0 deletions gcloud/taskflow3/apis/django/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from gcloud.taskflow3.domains.context import TaskContext
from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher
from gcloud.taskflow3.models import TaskFlowInstance, TimeoutNodeConfig
from gcloud.taskflow3.utils import extract_nodes_by_statuses, fetch_node_id__auto_retry_info_map
from gcloud.utils.decorators import request_validate
from gcloud.utils.throttle import check_task_operation_throttle, get_task_operation_frequence
from pipeline_web.preview import preview_template_tree
Expand Down Expand Up @@ -108,6 +109,20 @@ def status(request, project_id):
engine_ver=task.engine_ver, taskflow_id=task.id, pipeline_instance=task.pipeline_instance, project_id=project_id
)
result = dispatcher.get_task_status(subprocess_id=subprocess_id)

# 解析状态树失败或者任务尚未被调度,此时直接返回解析结果
if not result["result"] or not result["data"].get("id"):
return JsonResponse(result)

try:
status_tree, root_pipeline_id = result["data"], result["data"]["id"]
all_node_ids = extract_nodes_by_statuses(status_tree)
status_tree["auto_retry_infos"] = fetch_node_id__auto_retry_info_map(root_pipeline_id, all_node_ids)
except Exception as e:
message = "task[id={task_id}] extract failed node info error: {error}".format(task_id=task.id, error=e)
logger.exception(message)
return JsonResponse({"result": False, "message": message, "code": err_code.UNKNOWN_ERROR.code})

return JsonResponse(result)


Expand Down Expand Up @@ -190,6 +205,7 @@ def detail(request, project_id):
"ex_data": "节点错误信息(string)"
}
],
"auto_retry_info": {"node_id": "act1", "auto_retry_times": 3, "max_auto_retry_times": 10},
"inputs": "节点输入数据, include_data 为 1 时返回(object or null)",
"outputs": "节点输出数据, include_data 为 1 时返回(list)",
"ex_data": "节点错误信息, include_data 为 1 时返回(string)"
Expand Down
7 changes: 6 additions & 1 deletion gcloud/taskflow3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from gcloud.shortcuts.cmdb import get_business_attrinfo, get_business_group_members
from gcloud.taskflow3.domains.context import TaskContext
from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher
from gcloud.taskflow3.utils import parse_node_timeout_configs
from gcloud.taskflow3.utils import fetch_node_id__auto_retry_info_map, parse_node_timeout_configs
from gcloud.tasktmpl3.models import TaskTemplate
from gcloud.template_base.utils import inject_original_template_info, inject_template_node_id, replace_template_id
from gcloud.utils.components import format_component_name_with_remote, get_remote_plugin_name
Expand Down Expand Up @@ -978,6 +978,11 @@ def get_node_detail(
detail = node_detail_result["data"]
detail.update(node_data)

# 补充重试信息
detail["auto_retry_info"] = (
fetch_node_id__auto_retry_info_map(detail["parent_id"], [detail["id"]]).get(detail["id"]) or {}
)

return {"result": True, "data": detail, "message": "", "code": err_code.SUCCESS.code}

def task_claim(self, username, constants, name):
Expand Down
50 changes: 33 additions & 17 deletions gcloud/taskflow3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
"""

import logging
from typing import Any, Dict, List, Optional

from bamboo_engine import states as bamboo_engine_states
from django.apps import apps
from django.utils.translation import ugettext_lazy as _
from pipeline.core import constants as pipeline_constants
from pipeline.engine import states as pipeline_states
from pipeline.engine.utils import calculate_elapsed_time
from pipeline.core import constants as pipeline_constants
from bamboo_engine import states as bamboo_engine_states
from django.utils.translation import ugettext_lazy as _

from gcloud.utils.dates import format_datetime

Expand Down Expand Up @@ -90,30 +91,45 @@ def add_node_name_to_status_tree(pipeline_tree, status_tree_children):
add_node_name_to_status_tree(pipeline_tree.get("activities", {}).get(node_id, {}).get("pipeline", {}), children)


def extract_failed_nodes(status_tree):
FAILED_STATE = "FAILED"
failed_nodes = []
def extract_nodes_by_statuses(status_tree: Dict, statuses: Optional[List[str]] = None) -> List[str]:
"""
在状态树中获取指定状态的节点 ID 列表
:param status_tree:
:param statuses: 为空取任意状态
:return:
"""
nodes: List[str] = []
for node_id, status in status_tree["children"].items():
if status["state"] == FAILED_STATE:
failed_nodes.append(node_id)
failed_nodes += extract_failed_nodes(status)
return failed_nodes
if not statuses or status["state"] in statuses:
nodes.append(node_id)
nodes += extract_nodes_by_statuses(status, statuses)
return nodes


def get_failed_nodes_info(root_pipeline_id, failed_node_ids):
info = {failed_node_id: {} for failed_node_id in failed_node_ids}

# 获取失败节点自动重试数据
for node_id, auto_retry_info in fetch_node_id__auto_retry_info_map(root_pipeline_id, failed_node_ids).items():
info[node_id].update(auto_retry_info)

return info


def fetch_node_id__auto_retry_info_map(root_pipeline_id, node_ids: List[str]) -> Dict[str, Dict[str, Any]]:
"""获取指定节点ID列表的自动重试配置信息"""
node_id__auto_retry_info_map: Dict[str, Dict[str, Any]] = {}
AutoRetryNodeStrategy = apps.get_model("taskflow3", "AutoRetryNodeStrategy")
strategy_info = AutoRetryNodeStrategy.objects.filter(
root_pipeline_id=root_pipeline_id, node_id__in=failed_node_ids
root_pipeline_id=root_pipeline_id, node_id__in=node_ids
).values("node_id", "retry_times", "max_retry_times")
for strategy in strategy_info:
info[strategy["node_id"]].update(
{"auto_retry_times": strategy["retry_times"], "max_auto_retry_times": strategy["max_retry_times"]}
)

return info
for strategy in strategy_info:
node_id__auto_retry_info_map[strategy["node_id"]] = {
"node_id": strategy["node_id"],
"auto_retry_times": strategy["retry_times"],
"max_auto_retry_times": strategy["max_retry_times"],
}
return node_id__auto_retry_info_map


def parse_node_timeout_configs(pipeline_tree: dict) -> list:
Expand Down
11 changes: 8 additions & 3 deletions gcloud/tests/taskflow3/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from django.test import TestCase

from gcloud.taskflow3.models import AutoRetryNodeStrategy
from gcloud.taskflow3.utils import parse_node_timeout_configs, extract_failed_nodes, get_failed_nodes_info
from gcloud.taskflow3.utils import extract_nodes_by_statuses, get_failed_nodes_info, parse_node_timeout_configs


class UtilsTestCase(TestCase):
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_parse_node_timeout_configs_fail_and_ignore(self):
self.assertEqual(parse_result["result"], True)
self.assertEqual(parse_result["data"], parse_configs)

def test_extract_failed_nodes(self):
def test_extract_nodes_from_status_tree(self):
status_tree = {
"id": "root_pipeline_id",
"children": {
Expand All @@ -84,12 +84,17 @@ def test_extract_failed_nodes(self):
},
"state": "FAILED",
}
failed_nodes = extract_failed_nodes(status_tree)
failed_nodes = extract_nodes_by_statuses(status_tree, ["FAILED"])
self.assertEqual(failed_nodes, ["act_2", "act_2_2"])

# get all
all_nodes = extract_nodes_by_statuses(status_tree)
self.assertEqual(all_nodes, ["act_1", "act_2", "act_2_1", "act_2_2", "act_3"])

def test_get_failed_nodes_info(self):
FAILED_NODES_INFO = {
"act_1": {
"node_id": "act_1",
"auto_retry_times": self.arn_instance.retry_times,
"max_auto_retry_times": self.arn_instance.max_retry_times,
},
Expand Down
Loading