diff --git a/compiler/definitions/ir/nodes/remote_pipe.py b/compiler/definitions/ir/nodes/remote_pipe.py index fd052ea09..136ab00e2 100644 --- a/compiler/definitions/ir/nodes/remote_pipe.py +++ b/compiler/definitions/ir/nodes/remote_pipe.py @@ -8,6 +8,11 @@ def __init__(self, inputs, outputs, com_name, com_category, com_redirs=com_redirs, com_assignments=com_assignments) + def is_remote_read(self): + com_name = self.com_name.opt_serialize() + read_com = config.config['runtime']['remote_read_binary'] + return read_com in com_name + def make_remote_pipe(inputs, outputs, host_ip, port, is_remote_read, id): com_category = "pure" options = [] diff --git a/compiler/dspash/ir_helper.py b/compiler/dspash/ir_helper.py index 9cb1602f5..c890914bf 100644 --- a/compiler/dspash/ir_helper.py +++ b/compiler/dspash/ir_helper.py @@ -68,6 +68,10 @@ def to_shell_file(graph: IR, args) -> str: os.makedirs(directory, exist_ok=True) if not args.no_eager: + # Set DFGNode next id to not clash with already existing ids + # TODO: ideally we should get the next_id from the graph object + # to avoid conflicts across parallel processes + DFGNode.next_id = max(DFGNode.next_id , max(graph.nodes.keys()) + 1) graph = pash_runtime.add_eager_nodes(graph, args.dgsh_tee) script = to_shell(graph, args) @@ -261,6 +265,9 @@ def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, inpu # sometimes a command can have both a file resource and an ephemeral resources (example: spell oneliner) continue + # for worker, graph in worker_subgraph_pairs: + # print(to_shell(graph, config.pash_args), file=sys.stderr) + return main_graph, worker_subgraph_pairs def prepare_graph_for_remote_exec(filename:str, get_worker:Callable): diff --git a/compiler/pash_runtime.py b/compiler/pash_runtime.py index e2155e6ee..cb2203fa5 100644 --- a/compiler/pash_runtime.py +++ b/compiler/pash_runtime.py @@ -21,6 +21,7 @@ import definitions.ir.nodes.r_split as r_split import definitions.ir.nodes.r_unwrap as r_unwrap import definitions.ir.nodes.dgsh_tee as dgsh_tee +import definitions.ir.nodes.remote_pipe as remote_pipe import definitions.ir.nodes.dfs_split_reader as dfs_split_reader # Distirbuted Exec import dspash.hdfs_utils as hdfs_utils @@ -721,14 +722,20 @@ def add_eager_nodes(graph, use_dgsh_tee): intermediateFileIdGen = FileIdGen(0, runtime_config['eager_intermediate_prefix']) ## Get the next nodes - workset = [node for source_node_id in source_node_ids for node in graph.get_next_nodes(source_node_id)] + workset = source_node_ids visited = set() while (len(workset) > 0): curr_id = workset.pop(0) curr = graph.get_node(curr_id) + if (not curr_id in visited): visited.add(curr_id) next_node_ids = graph.get_next_nodes(curr_id) + + # Skip if this is the last node + if not next_node_ids: + continue + workset += next_node_ids ## TODO: Make sure that we don't add duplicate eager nodes @@ -764,6 +771,12 @@ def add_eager_nodes(graph, use_dgsh_tee): for edge_id in eager_input_ids: add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee) + ## Add an eager after remote_pipe + if(isinstance(curr, remote_pipe.RemotePipe) and curr.is_remote_read()): + eager_input_ids = curr.outputs + for edge_id in eager_input_ids: + add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee) + return graph