diff --git a/gcloud/apigw/views/get_task_status.py b/gcloud/apigw/views/get_task_status.py index b1d307b93d..a2550351b2 100644 --- a/gcloud/apigw/views/get_task_status.py +++ b/gcloud/apigw/views/get_task_status.py @@ -11,6 +11,7 @@ specific language governing permissions and limitations under the License. """ from apigw_manager.apigw.decorators import apigw_require +from bamboo_engine import states as bamboo_engine_states from blueapps.account.decorators import login_exempt from cachetools import TTLCache from django.views.decorators.http import require_GET @@ -23,7 +24,7 @@ from gcloud.iam_auth.view_interceptors.apigw import TaskViewInterceptor from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher from gcloud.taskflow3.models import TaskFlowInstance -from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_failed_nodes, get_failed_nodes_info +from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_nodes_by_statuses, get_failed_nodes_info def cache_decisioner(key, value): @@ -91,7 +92,7 @@ def get_task_status(request, task_id, project_id): if with_failed_node_info or with_auto_retry_status: try: status_tree, root_pipeline_id = result["data"], result["data"]["id"] - failed_node_ids = extract_failed_nodes(status_tree) + failed_node_ids = extract_nodes_by_statuses(status_tree, statuses=[bamboo_engine_states.FAILED]) failed_node_info = get_failed_nodes_info(root_pipeline_id, failed_node_ids) if with_failed_node_info: result["data"]["failed_node_info"] = failed_node_info diff --git a/gcloud/taskflow3/apis/django/api.py b/gcloud/taskflow3/apis/django/api.py index f670504b11..e1fd809811 100644 --- a/gcloud/taskflow3/apis/django/api.py +++ b/gcloud/taskflow3/apis/django/api.py @@ -68,6 +68,7 @@ from gcloud.taskflow3.domains.context import TaskContext from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher from gcloud.taskflow3.models import TaskFlowInstance, TimeoutNodeConfig +from gcloud.taskflow3.utils import extract_nodes_by_statuses, fetch_node_id__auto_retry_info_map from gcloud.utils.decorators import request_validate from gcloud.utils.throttle import check_task_operation_throttle, get_task_operation_frequence from pipeline_web.preview import preview_template_tree @@ -108,6 +109,20 @@ def status(request, project_id): engine_ver=task.engine_ver, taskflow_id=task.id, pipeline_instance=task.pipeline_instance, project_id=project_id ) result = dispatcher.get_task_status(subprocess_id=subprocess_id) + + # 解析状态树失败或者任务尚未被调度,此时直接返回解析结果 + if not result["result"] or not result["data"].get("id"): + return JsonResponse(result) + + try: + status_tree, root_pipeline_id = result["data"], result["data"]["id"] + all_node_ids = extract_nodes_by_statuses(status_tree) + status_tree["auto_retry_infos"] = fetch_node_id__auto_retry_info_map(root_pipeline_id, all_node_ids) + except Exception as e: + message = "task[id={task_id}] extract failed node info error: {error}".format(task_id=task.id, error=e) + logger.exception(message) + return JsonResponse({"result": False, "message": message, "code": err_code.UNKNOWN_ERROR.code}) + return JsonResponse(result) @@ -190,6 +205,7 @@ def detail(request, project_id): "ex_data": "节点错误信息(string)" } ], + "auto_retry_info": {"node_id": "act1", "auto_retry_times": 3, "max_auto_retry_times": 10}, "inputs": "节点输入数据, include_data 为 1 时返回(object or null)", "outputs": "节点输出数据, include_data 为 1 时返回(list)", "ex_data": "节点错误信息, include_data 为 1 时返回(string)" diff --git a/gcloud/taskflow3/models.py b/gcloud/taskflow3/models.py index f62c64a0f7..34e08570be 100644 --- a/gcloud/taskflow3/models.py +++ b/gcloud/taskflow3/models.py @@ -54,7 +54,7 @@ from gcloud.shortcuts.cmdb import get_business_attrinfo, get_business_group_members from gcloud.taskflow3.domains.context import TaskContext from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher -from gcloud.taskflow3.utils import parse_node_timeout_configs +from gcloud.taskflow3.utils import fetch_node_id__auto_retry_info_map, parse_node_timeout_configs from gcloud.tasktmpl3.models import TaskTemplate from gcloud.template_base.utils import inject_original_template_info, inject_template_node_id, replace_template_id from gcloud.utils.components import format_component_name_with_remote, get_remote_plugin_name @@ -978,6 +978,11 @@ def get_node_detail( detail = node_detail_result["data"] detail.update(node_data) + # 补充重试信息 + detail["auto_retry_info"] = ( + fetch_node_id__auto_retry_info_map(detail["parent_id"], [detail["id"]]).get(detail["id"]) or {} + ) + return {"result": True, "data": detail, "message": "", "code": err_code.SUCCESS.code} def task_claim(self, username, constants, name): diff --git a/gcloud/taskflow3/utils.py b/gcloud/taskflow3/utils.py index f4213d6840..855dba29dc 100644 --- a/gcloud/taskflow3/utils.py +++ b/gcloud/taskflow3/utils.py @@ -12,13 +12,14 @@ """ import logging +from typing import Any, Dict, List, Optional +from bamboo_engine import states as bamboo_engine_states from django.apps import apps +from django.utils.translation import ugettext_lazy as _ +from pipeline.core import constants as pipeline_constants from pipeline.engine import states as pipeline_states from pipeline.engine.utils import calculate_elapsed_time -from pipeline.core import constants as pipeline_constants -from bamboo_engine import states as bamboo_engine_states -from django.utils.translation import ugettext_lazy as _ from gcloud.utils.dates import format_datetime @@ -90,30 +91,45 @@ def add_node_name_to_status_tree(pipeline_tree, status_tree_children): add_node_name_to_status_tree(pipeline_tree.get("activities", {}).get(node_id, {}).get("pipeline", {}), children) -def extract_failed_nodes(status_tree): - FAILED_STATE = "FAILED" - failed_nodes = [] +def extract_nodes_by_statuses(status_tree: Dict, statuses: Optional[List[str]] = None) -> List[str]: + """ + 在状态树中获取指定状态的节点 ID 列表 + :param status_tree: + :param statuses: 为空取任意状态 + :return: + """ + nodes: List[str] = [] for node_id, status in status_tree["children"].items(): - if status["state"] == FAILED_STATE: - failed_nodes.append(node_id) - failed_nodes += extract_failed_nodes(status) - return failed_nodes + if not statuses or status["state"] in statuses: + nodes.append(node_id) + nodes += extract_nodes_by_statuses(status, statuses) + return nodes def get_failed_nodes_info(root_pipeline_id, failed_node_ids): info = {failed_node_id: {} for failed_node_id in failed_node_ids} - # 获取失败节点自动重试数据 + for node_id, auto_retry_info in fetch_node_id__auto_retry_info_map(root_pipeline_id, failed_node_ids).items(): + info[node_id].update(auto_retry_info) + + return info + + +def fetch_node_id__auto_retry_info_map(root_pipeline_id, node_ids: List[str]) -> Dict[str, Dict[str, Any]]: + """获取指定节点ID列表的自动重试配置信息""" + node_id__auto_retry_info_map: Dict[str, Dict[str, Any]] = {} AutoRetryNodeStrategy = apps.get_model("taskflow3", "AutoRetryNodeStrategy") strategy_info = AutoRetryNodeStrategy.objects.filter( - root_pipeline_id=root_pipeline_id, node_id__in=failed_node_ids + root_pipeline_id=root_pipeline_id, node_id__in=node_ids ).values("node_id", "retry_times", "max_retry_times") - for strategy in strategy_info: - info[strategy["node_id"]].update( - {"auto_retry_times": strategy["retry_times"], "max_auto_retry_times": strategy["max_retry_times"]} - ) - return info + for strategy in strategy_info: + node_id__auto_retry_info_map[strategy["node_id"]] = { + "node_id": strategy["node_id"], + "auto_retry_times": strategy["retry_times"], + "max_auto_retry_times": strategy["max_retry_times"], + } + return node_id__auto_retry_info_map def parse_node_timeout_configs(pipeline_tree: dict) -> list: diff --git a/gcloud/tests/taskflow3/models/taskflow/test_get_node_detail.py b/gcloud/tests/taskflow3/models/taskflow/test_get_node_detail.py index 0fa45548fe..9ef3e3d82b 100644 --- a/gcloud/tests/taskflow3/models/taskflow/test_get_node_detail.py +++ b/gcloud/tests/taskflow3/models/taskflow/test_get_node_detail.py @@ -66,7 +66,7 @@ def test_include_data_is_false(self): taskflow.has_node = MagicMock(return_value=True) dispatcher = MagicMock() get_node_data_return = {"result": True, "data": {}} - get_node_detail_return = {"result": True, "data": {}} + get_node_detail_return = {"result": True, "data": {"id": "id", "parent_id": "parent_id"}} dispatcher.get_node_data = MagicMock(return_value=get_node_data_return) dispatcher.get_node_detail = MagicMock(return_value=get_node_detail_return) dispatcher_init = MagicMock(return_value=dispatcher) @@ -98,7 +98,15 @@ def test_include_data_is_false(self): loop=loop, subprocess_simple_inputs=False, ) - self.assertEqual(detail, {"code": 0, "data": {}, "message": "", "result": True}) + self.assertEqual( + detail, + { + "code": 0, + "data": {"auto_retry_info": {}, "id": "id", "parent_id": "parent_id"}, + "message": "", + "result": True, + }, + ) def test_success(self): taskflow = TaskFlowInstance() @@ -107,7 +115,7 @@ def test_success(self): taskflow.has_node = MagicMock(return_value=True) dispatcher = MagicMock() get_node_data_return = {"result": True, "data": {"data": "data"}} - get_node_detail_return = {"result": True, "data": {"detail": "detail"}} + get_node_detail_return = {"result": True, "data": {"id": "id", "parent_id": "parent_id"}} dispatcher.get_node_data = MagicMock(return_value=get_node_data_return) dispatcher.get_node_detail = MagicMock(return_value=get_node_detail_return) dispatcher_init = MagicMock(return_value=dispatcher) @@ -149,5 +157,11 @@ def test_success(self): subprocess_simple_inputs=False, ) self.assertEqual( - detail, {"code": 0, "data": {"data": "data", "detail": "detail"}, "message": "", "result": True} + detail, + { + "code": 0, + "data": {"data": "data", "id": "id", "parent_id": "parent_id", "auto_retry_info": {}}, + "message": "", + "result": True, + }, ) diff --git a/gcloud/tests/taskflow3/test_utils.py b/gcloud/tests/taskflow3/test_utils.py index d3ef04231c..5b39dfc939 100644 --- a/gcloud/tests/taskflow3/test_utils.py +++ b/gcloud/tests/taskflow3/test_utils.py @@ -13,7 +13,7 @@ from django.test import TestCase from gcloud.taskflow3.models import AutoRetryNodeStrategy -from gcloud.taskflow3.utils import parse_node_timeout_configs, extract_failed_nodes, get_failed_nodes_info +from gcloud.taskflow3.utils import extract_nodes_by_statuses, get_failed_nodes_info, parse_node_timeout_configs class UtilsTestCase(TestCase): @@ -67,7 +67,7 @@ def test_parse_node_timeout_configs_fail_and_ignore(self): self.assertEqual(parse_result["result"], True) self.assertEqual(parse_result["data"], parse_configs) - def test_extract_failed_nodes(self): + def test_extract_nodes_from_status_tree(self): status_tree = { "id": "root_pipeline_id", "children": { @@ -84,12 +84,17 @@ def test_extract_failed_nodes(self): }, "state": "FAILED", } - failed_nodes = extract_failed_nodes(status_tree) + failed_nodes = extract_nodes_by_statuses(status_tree, ["FAILED"]) self.assertEqual(failed_nodes, ["act_2", "act_2_2"]) + # get all + all_nodes = extract_nodes_by_statuses(status_tree) + self.assertEqual(all_nodes, ["act_1", "act_2", "act_2_1", "act_2_2", "act_3"]) + def test_get_failed_nodes_info(self): FAILED_NODES_INFO = { "act_1": { + "node_id": "act_1", "auto_retry_times": self.arn_instance.retry_times, "max_auto_retry_times": self.arn_instance.max_retry_times, },