From 7df268604572f20cbb65379314bdf58d1a9ae320 Mon Sep 17 00:00:00 2001 From: durant <826035498@qq.com> Date: Wed, 15 May 2024 16:08:43 +0800 Subject: [PATCH] =?UTF-8?q?perf(backend):=20=E4=BC=98=E5=8C=96pipeline?= =?UTF-8?q?=E6=A0=91=E9=80=92=E5=BD=92=20#4424?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbm-ui/backend/flow/engine/bamboo/engine.py | 96 ++++++++++----------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/dbm-ui/backend/flow/engine/bamboo/engine.py b/dbm-ui/backend/flow/engine/bamboo/engine.py index b3cbc01c26..ce4fa32a78 100644 --- a/dbm-ui/backend/flow/engine/bamboo/engine.py +++ b/dbm-ui/backend/flow/engine/bamboo/engine.py @@ -151,59 +151,58 @@ def get_subprocess_state(self, children: Dict, children_status: List): def recursion_subprocess_status(self, activities: Dict, node_maps: Dict): """为子流程添加状态""" - raw_data = copy.deepcopy(activities) - for node_id, activity in raw_data.items(): - for key, value in activity.items(): - if value == "SubProcess": - children_status_list = [] - children_states = self.get_children_states(node_id=node_id).data - try: - children = children_states[node_id]["children"] - self.get_subprocess_state(children, children_status_list) - status = children_states[node_id]["state"] - if status == states.RUNNING and states.FAILED in children_status_list: - status = states.FAILED - elif status == states.RUNNING and states.REVOKED in children_status_list: - status = states.REVOKED - except KeyError: - status = states.CREATED - - act_status = [] - self.recursion_subprocess_activity_status( - self.root_id, activity["pipeline"]["activities"], act_status, node_maps - ) - if states.FAILED in act_status: + for node_id, activity in activities.items(): + if activity.get("pipeline"): + self.recursion_subprocess_status(activities[node_id]["pipeline"]["activities"], node_maps) + + if activity.get("type") == "SubProcess": + children_status_list = [] + children_states = self.get_children_states(node_id=node_id).data + try: + children = children_states[node_id]["children"] + self.get_subprocess_state(children, children_status_list) + status = children_states[node_id]["state"] + if status == states.RUNNING and states.FAILED in children_status_list: status = states.FAILED - elif states.REVOKED in act_status: + elif status == states.RUNNING and states.REVOKED in children_status_list: status = states.REVOKED - - activities[node_id]["status"] = status - elif key == "pipeline": - self.recursion_subprocess_status(activities[node_id]["pipeline"]["activities"], node_maps) - - def recursion_nodes_status(self, node: FlowNode, status: str, raw_data: Dict): - for key, values in raw_data.items(): - if key == node.node_id: - raw_data[key]["status"] = status - raw_data[key]["created_at"] = int(datetime2timestamp(node.created_at)) - raw_data[key]["started_at"] = int(datetime2timestamp(node.started_at)) - raw_data[key]["updated_at"] = int(datetime2timestamp(node.updated_at)) - raw_data[key]["hosts"] = node.hosts + except KeyError: + status = states.CREATED + + act_status = [] + self.recursion_subprocess_act_status( + self.root_id, activity["pipeline"]["activities"], act_status, node_maps + ) + if states.FAILED in act_status: + status = states.FAILED + elif states.REVOKED in act_status: + status = states.REVOKED + activities[node_id]["status"] = status + + def recursion_nodes_status(self, tree: Dict, node_maps: Dict): + for key, values in tree.items(): + if key in node_maps: + node = node_maps[key] + tree[key]["status"] = node.status + tree[key]["created_at"] = int(datetime2timestamp(node.created_at)) + tree[key]["started_at"] = int(datetime2timestamp(node.started_at)) + tree[key]["updated_at"] = int(datetime2timestamp(node.updated_at)) + tree[key]["hosts"] = node.hosts continue if isinstance(values, dict): - self.recursion_nodes_status(node, status, values) + self.recursion_nodes_status(values, node_maps) - def recursion_subprocess_activity_status(self, root_id: str, activities: Dict, act_status: List, node_maps: Dict): + def recursion_subprocess_act_status(self, root_id: str, activities: Dict, act_status: List, node_maps: Dict): for node_id, activity in activities.items(): - for key, value in activity.items(): - if key == "type" and value == "SubProcess": - self.recursion_subprocess_activity_status( - root_id, activity["pipeline"]["activities"], act_status, node_maps - ) - elif key == "type" and value == "ServiceActivity": - status = node_maps[node_id] - act_status.append(status) + activity_type = activity.get("type") + if activity_type == "SubProcess": + self.recursion_subprocess_act_status( + root_id, activity["pipeline"]["activities"], act_status, node_maps + ) + elif activity_type == "ServiceActivity": + status = node_maps[node_id].status + act_status.append(status) def recursion_translate_activity(self, activities: Dict): """递归翻译节点名称""" @@ -221,11 +220,10 @@ def get_pipeline_tree_states(self) -> Optional[Dict]: node_maps = {} nodes = FlowNode.objects.filter(root_id=self.root_id) for node in nodes: - node_maps[node.node_id] = node.status + node_maps[node.node_id] = node self.recursion_subprocess_status(activities, node_maps) self.recursion_translate_activity(activities) - for node in nodes: - self.recursion_nodes_status(node, node.status, tree) + self.recursion_nodes_status(tree, node_maps) return tree def get_pipeline_tree(self) -> Optional[Dict]: