Skip to content

Commit

Permalink
feature: get_task_status apigw 接口支持返回节点待自动重试信息
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls authored and ZhuoZhuoCrayon committed Oct 9, 2023
1 parent f96b057 commit fabcd9c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
Binary file modified gcloud/apigw/docs/apigw-docs.tgz
Binary file not shown.
33 changes: 22 additions & 11 deletions gcloud/apigw/views/get_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from apigw_manager.apigw.decorators import apigw_require
from blueapps.account.decorators import login_exempt
from cachetools import TTLCache
from django.views.decorators.http import require_GET

from blueapps.account.decorators import login_exempt
from gcloud.apigw.utils import bucket_cached, BucketTTLCache, api_bucket_and_key

from gcloud import err_code
from gcloud.apigw.decorators import mark_request_whether_is_trust, return_json_response
from gcloud.apigw.decorators import project_inject
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_failed_nodes, get_failed_nodes_info
from gcloud.apigw.decorators import mark_request_whether_is_trust, project_inject, return_json_response
from gcloud.apigw.utils import BucketTTLCache, api_bucket_and_key, bucket_cached
from gcloud.apigw.views.utils import logger
from gcloud.iam_auth.intercept import iam_intercept
from gcloud.iam_auth.view_interceptors.apigw import TaskViewInterceptor
from apigw_manager.apigw.decorators import apigw_require
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_failed_nodes, get_failed_nodes_info


def cache_decisioner(key, value):
Expand Down Expand Up @@ -54,6 +52,7 @@ def get_task_status(request, task_id, project_id):
subprocess_id = request.GET.get("subprocess_id")
with_ex_data = request.GET.get("with_ex_data")
with_failed_node_info = request.GET.get("with_failed_node_info")
with_auto_retry_status = request.GET.get("with_auto_retry_status")

try:
task = TaskFlowInstance.objects.get(pk=task_id, project_id=project.id, is_deleted=False)
Expand Down Expand Up @@ -89,12 +88,24 @@ def get_task_status(request, task_id, project_id):
"code": err_code.UNKNOWN_ERROR.code,
}

if with_failed_node_info:
if with_failed_node_info or with_auto_retry_status:
try:
status_tree, root_pipeline_id = result["data"], result["data"]["id"]
failed_node_ids = extract_failed_nodes(status_tree)
failed_node_info = get_failed_nodes_info(root_pipeline_id, failed_node_ids)
result["data"]["failed_node_info"] = failed_node_info
if with_failed_node_info:
result["data"]["failed_node_info"] = failed_node_info
if with_auto_retry_status:
auto_retry_waiting_nodes = [
node_id
for node_id, failed_info in failed_node_info.items()
if "max_auto_retry_times" in failed_info
and failed_info["auto_retry_times"] < failed_info["max_auto_retry_times"]
]
result["data"]["auto_retry_status"] = {
"exist_auto_retry_nodes": True if len(auto_retry_waiting_nodes) else False,
"auto_retry_nodes": auto_retry_waiting_nodes,
}
except Exception as e:
message = "task[id={task_id}] extract failed node info error: {error}".format(task_id=task_id, error=e)
logger.exception(message)
Expand Down

0 comments on commit fabcd9c

Please sign in to comment.