diff --git a/gcloud/taskflow3/domains/dispatchers/node.py b/gcloud/taskflow3/domains/dispatchers/node.py index b310dc3777..0e6dbdb88a 100644 --- a/gcloud/taskflow3/domains/dispatchers/node.py +++ b/gcloud/taskflow3/domains/dispatchers/node.py @@ -475,6 +475,8 @@ def get_node_data_v2( node_id=self.node_id, pipeline=pipeline_instance.execution_data, subprocess_stack=subprocess_stack ) + node_code = node_info.get("component", {}).get("code") + if state: # 获取最新的执行数据 if loop is None or int(loop) >= state[self.node_id]["loop"]: @@ -502,6 +504,9 @@ def get_node_data_v2( if node_info["type"] == "SubProcess": # remove prefix '${' and subfix '}' in subprocess execution input inputs = {k[2:-1]: v for k, v in data["inputs"].items()} + elif node_info["type"] == "ServiceActivity" and node_code == "subprocess_plugin": + raw_inputs = data["inputs"]["subprocess"]["pipeline"]["constants"] + inputs = {key[2:-1]: value.get("value") for key, value in raw_inputs.items()} else: inputs = data["inputs"] outputs = data["outputs"] @@ -576,6 +581,8 @@ def get_node_data_v2( if node_info["type"] == "SubProcess": # remove prefix '${' and subfix '}' in subprocess execution input inputs = {k[2:-1]: v for k, v in preview_inputs.items()} + elif node_info["type"] == "ServiceActivity" and node_code == "subprocess_plugin": + inputs = {k[2:-1]: v for k, v in preview_inputs.items()} else: inputs = preview_inputs