Skip to content

Commit

Permalink
perf(backend): 优化pipeline树递归 #4424
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 authored and iSecloud committed May 15, 2024
1 parent 1d898e1 commit 7df2686
Showing 1 changed file with 47 additions and 49 deletions.
96 changes: 47 additions & 49 deletions dbm-ui/backend/flow/engine/bamboo/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,59 +151,58 @@ def get_subprocess_state(self, children: Dict, children_status: List):

def recursion_subprocess_status(self, activities: Dict, node_maps: Dict):
"""为子流程添加状态"""
raw_data = copy.deepcopy(activities)
for node_id, activity in raw_data.items():
for key, value in activity.items():
if value == "SubProcess":
children_status_list = []
children_states = self.get_children_states(node_id=node_id).data
try:
children = children_states[node_id]["children"]
self.get_subprocess_state(children, children_status_list)
status = children_states[node_id]["state"]
if status == states.RUNNING and states.FAILED in children_status_list:
status = states.FAILED
elif status == states.RUNNING and states.REVOKED in children_status_list:
status = states.REVOKED
except KeyError:
status = states.CREATED

act_status = []
self.recursion_subprocess_activity_status(
self.root_id, activity["pipeline"]["activities"], act_status, node_maps
)
if states.FAILED in act_status:
for node_id, activity in activities.items():
if activity.get("pipeline"):
self.recursion_subprocess_status(activities[node_id]["pipeline"]["activities"], node_maps)

if activity.get("type") == "SubProcess":
children_status_list = []
children_states = self.get_children_states(node_id=node_id).data
try:
children = children_states[node_id]["children"]
self.get_subprocess_state(children, children_status_list)
status = children_states[node_id]["state"]
if status == states.RUNNING and states.FAILED in children_status_list:
status = states.FAILED
elif states.REVOKED in act_status:
elif status == states.RUNNING and states.REVOKED in children_status_list:
status = states.REVOKED

activities[node_id]["status"] = status
elif key == "pipeline":
self.recursion_subprocess_status(activities[node_id]["pipeline"]["activities"], node_maps)

def recursion_nodes_status(self, node: FlowNode, status: str, raw_data: Dict):
for key, values in raw_data.items():
if key == node.node_id:
raw_data[key]["status"] = status
raw_data[key]["created_at"] = int(datetime2timestamp(node.created_at))
raw_data[key]["started_at"] = int(datetime2timestamp(node.started_at))
raw_data[key]["updated_at"] = int(datetime2timestamp(node.updated_at))
raw_data[key]["hosts"] = node.hosts
except KeyError:
status = states.CREATED

act_status = []
self.recursion_subprocess_act_status(
self.root_id, activity["pipeline"]["activities"], act_status, node_maps
)
if states.FAILED in act_status:
status = states.FAILED
elif states.REVOKED in act_status:
status = states.REVOKED
activities[node_id]["status"] = status

def recursion_nodes_status(self, tree: Dict, node_maps: Dict):
for key, values in tree.items():
if key in node_maps:
node = node_maps[key]
tree[key]["status"] = node.status
tree[key]["created_at"] = int(datetime2timestamp(node.created_at))
tree[key]["started_at"] = int(datetime2timestamp(node.started_at))
tree[key]["updated_at"] = int(datetime2timestamp(node.updated_at))
tree[key]["hosts"] = node.hosts
continue

if isinstance(values, dict):
self.recursion_nodes_status(node, status, values)
self.recursion_nodes_status(values, node_maps)

def recursion_subprocess_activity_status(self, root_id: str, activities: Dict, act_status: List, node_maps: Dict):
def recursion_subprocess_act_status(self, root_id: str, activities: Dict, act_status: List, node_maps: Dict):
for node_id, activity in activities.items():
for key, value in activity.items():
if key == "type" and value == "SubProcess":
self.recursion_subprocess_activity_status(
root_id, activity["pipeline"]["activities"], act_status, node_maps
)
elif key == "type" and value == "ServiceActivity":
status = node_maps[node_id]
act_status.append(status)
activity_type = activity.get("type")
if activity_type == "SubProcess":
self.recursion_subprocess_act_status(
root_id, activity["pipeline"]["activities"], act_status, node_maps
)
elif activity_type == "ServiceActivity":
status = node_maps[node_id].status
act_status.append(status)

def recursion_translate_activity(self, activities: Dict):
"""递归翻译节点名称"""
Expand All @@ -221,11 +220,10 @@ def get_pipeline_tree_states(self) -> Optional[Dict]:
node_maps = {}
nodes = FlowNode.objects.filter(root_id=self.root_id)
for node in nodes:
node_maps[node.node_id] = node.status
node_maps[node.node_id] = node
self.recursion_subprocess_status(activities, node_maps)
self.recursion_translate_activity(activities)
for node in nodes:
self.recursion_nodes_status(node, node.status, tree)
self.recursion_nodes_status(tree, node_maps)
return tree

def get_pipeline_tree(self) -> Optional[Dict]:
Expand Down

0 comments on commit 7df2686

Please sign in to comment.