From 7ec6d7bce556afa20d4c97fe1a423ba84d9bf8fc Mon Sep 17 00:00:00 2001 From: ycggyao Date: Fri, 29 Nov 2024 18:04:00 +0800 Subject: [PATCH] =?UTF-8?q?fix(backend):=20process=5Ftodo=E3=80=81batch=5F?= =?UTF-8?q?process=5Ftodo=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96=20#8303=20#?= =?UTF-8?q?=20Reviewed,=20transaction=20id:=2025412?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../backend/flow/utils/process_todo_utils.py | 44 +++++++++++++++++++ dbm-ui/backend/ticket/models/ticket.py | 13 +++--- dbm-ui/backend/ticket/views.py | 21 +++++---- 3 files changed, 64 insertions(+), 14 deletions(-) create mode 100644 dbm-ui/backend/flow/utils/process_todo_utils.py diff --git a/dbm-ui/backend/flow/utils/process_todo_utils.py b/dbm-ui/backend/flow/utils/process_todo_utils.py new file mode 100644 index 0000000000..be77b17606 --- /dev/null +++ b/dbm-ui/backend/flow/utils/process_todo_utils.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import asyncio +import concurrent.futures +import logging.config + +from backend.ticket.models import Todo +from backend.ticket.todos import TodoActorFactory + +logger = logging.getLogger("flow") + + +def process_single_todo(operation, act, username): + """ + 处理单个待办的辅助函数 + """ + todo_id = operation["todo_id"] + params = operation["params"] + todo = Todo.objects.get(id=todo_id) + TodoActorFactory.actor(todo).process(username, act, params) + + +async def process_operations_async(operations, act, username): + """ + 异步处理待办操作 + """ + loop = asyncio.get_running_loop() + + # 自定义线程池 + with concurrent.futures.ThreadPoolExecutor() as executor: + # 将所有的任务提交到线程池 + tasks = [ + loop.run_in_executor(executor, process_single_todo, operation, act, username) for operation in operations + ] + # 等待所有任务完成 + await asyncio.gather(*tasks) diff --git a/dbm-ui/backend/ticket/models/ticket.py b/dbm-ui/backend/ticket/models/ticket.py index c9dac3d7e3..60882f8a28 100644 --- a/dbm-ui/backend/ticket/models/ticket.py +++ b/dbm-ui/backend/ticket/models/ticket.py @@ -139,8 +139,10 @@ def current_flow(self) -> Flow: 1. 取 TicketFlow 中最后一个 flow_obj_id 非空的流程 2. 若 TicketFlow 中都流程都为空,则代表整个单据未开始,取第一个流程 """ - if Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).exists(): - return Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).last() + non_pending_flows = [flow for flow in self.flows.all() if flow.status != TicketFlowStatus.PENDING] + if non_pending_flows: + # 返回最后一个符合条件的 Flow 对象 + return non_pending_flows[-1] # 初始化时,当前节点和下一个节点为同一个 return self.next_flow() @@ -148,13 +150,14 @@ def next_flow(self) -> Flow: """ 下一个流程,即 TicketFlow 中第一个为PENDING的流程 """ - next_flows = Flow.objects.filter(ticket=self, status=TicketFlowStatus.PENDING) + next_flows = [flow for flow in self.flows.all() if flow.status == TicketFlowStatus.PENDING] # 支持跳过人工审批和确认环节 if env.ITSM_FLOW_SKIP: - next_flows = next_flows.exclude(flow_type__in=[FlowType.BK_ITSM, FlowType.PAUSE]) + next_flows = [flow for flow in next_flows if flow.flow_type not in [FlowType.BK_ITSM, FlowType.PAUSE]] - return next_flows.first() + # 返回第一个符合条件的 Flow 对象 + return next_flows[0] if next_flows else None @classmethod def create_ticket( diff --git a/dbm-ui/backend/ticket/views.py b/dbm-ui/backend/ticket/views.py index 9b66b6e7a4..2f863fb2aa 100644 --- a/dbm-ui/backend/ticket/views.py +++ b/dbm-ui/backend/ticket/views.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import asyncio import operator from functools import reduce @@ -25,6 +26,7 @@ from backend.bk_web.swagger import PaginatedResponseSwaggerAutoSchema, common_swagger_auto_schema from backend.configuration.models import DBAdministrator from backend.db_services.ipchooser.query.resource import ResourceQueryHelper +from backend.flow.utils.process_todo_utils import process_operations_async from backend.iam_app.dataclass import ResourceEnum from backend.iam_app.dataclass.actions import ActionEnum from backend.iam_app.handlers.drf_perm.base import RejectPermission, ResourceActionPermission @@ -221,6 +223,7 @@ def perform_create(self, serializer): builder.patch_ticket_detail() builder.init_ticket_flows() + ticket = Ticket.objects.prefetch_related("flows").get(pk=ticket.pk) TicketFlowManager(ticket=ticket).run_next_flow() @swagger_auto_schema( @@ -437,7 +440,9 @@ def process_todo(self, request, *args, **kwargs): validated_data = self.params_validate(self.get_serializer_class()) - todo = ticket.todo_of_ticket.get(id=validated_data["todo_id"]) + todo = ( + Todo.objects.select_related("ticket").prefetch_related("ticket__flows").get(id=validated_data["todo_id"]) + ) TodoActorFactory.actor(todo).process(request.user.username, validated_data["action"], validated_data["params"]) return Response(TodoSerializer(ticket.todo_of_ticket.all(), many=True).data) @@ -644,15 +649,13 @@ def batch_process_todo(self, request, *args, **kwargs): """ validated_data = self.params_validate(self.get_serializer_class()) act = validated_data["action"] + operations = validated_data["operations"] - # 批量处理待办操作 - results = [] - for operation in validated_data["operations"]: - todo_id = operation["todo_id"] - params = operation["params"] - todo = Todo.objects.get(id=todo_id) - TodoActorFactory.actor(todo).process(request.user.username, act, params) - results.append(todo) + # 执行异步处理 + asyncio.run(process_operations_async(operations, act, request.user.username)) + + # 获取处理后的待办事项 + results = [Todo.objects.get(id=operation["todo_id"]) for operation in operations] # 使用 TodoSerializer 序列化响应数据 return Response(TodoSerializer(results, many=True).data)