Skip to content

Commit

Permalink
add eager after remote_read and fix conflicting node ids issue
Browse files Browse the repository at this point in the history
Signed-off-by: Tammam Mustafa <[email protected]>
  • Loading branch information
tammam1998 committed May 20, 2022
1 parent c0efa45 commit 494a436
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
5 changes: 5 additions & 0 deletions compiler/definitions/ir/nodes/remote_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ def __init__(self, inputs, outputs, com_name, com_category,
com_redirs=com_redirs,
com_assignments=com_assignments)

def is_remote_read(self):
com_name = self.com_name.opt_serialize()
read_com = config.config['runtime']['remote_read_binary']
return read_com in com_name

def make_remote_pipe(inputs, outputs, host_ip, port, is_remote_read, id):
com_category = "pure"
options = []
Expand Down
7 changes: 7 additions & 0 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def to_shell_file(graph: IR, args) -> str:
os.makedirs(directory, exist_ok=True)

if not args.no_eager:
# Set DFGNode next id to not clash with already existing ids
# TODO: ideally we should get the next_id from the graph object
# to avoid conflicts across parallel processes
DFGNode.next_id = max(DFGNode.next_id , graph.nodes.keys()+ 1)
graph = pash_runtime.add_eager_nodes(graph, args.dgsh_tee)

script = to_shell(graph, args)
Expand Down Expand Up @@ -261,6 +265,9 @@ def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, inpu
# sometimes a command can have both a file resource and an ephemeral resources (example: spell oneliner)
continue

# for worker, graph in worker_subgraph_pairs:
# print(to_shell(graph, config.pash_args), file=sys.stderr)

return main_graph, worker_subgraph_pairs

def prepare_graph_for_remote_exec(filename:str, get_worker:Callable):
Expand Down
15 changes: 14 additions & 1 deletion compiler/pash_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import definitions.ir.nodes.r_split as r_split
import definitions.ir.nodes.r_unwrap as r_unwrap
import definitions.ir.nodes.dgsh_tee as dgsh_tee
import definitions.ir.nodes.remote_pipe as remote_pipe
import definitions.ir.nodes.dfs_split_reader as dfs_split_reader
# Distirbuted Exec
import dspash.hdfs_utils as hdfs_utils
Expand Down Expand Up @@ -721,14 +722,20 @@ def add_eager_nodes(graph, use_dgsh_tee):
intermediateFileIdGen = FileIdGen(0, runtime_config['eager_intermediate_prefix'])

## Get the next nodes
workset = [node for source_node_id in source_node_ids for node in graph.get_next_nodes(source_node_id)]
workset = source_node_ids
visited = set()
while (len(workset) > 0):
curr_id = workset.pop(0)
curr = graph.get_node(curr_id)

if (not curr_id in visited):
visited.add(curr_id)
next_node_ids = graph.get_next_nodes(curr_id)

# Skip if this is the last node
if not next_node_ids:
continue

workset += next_node_ids

## TODO: Make sure that we don't add duplicate eager nodes
Expand Down Expand Up @@ -764,6 +771,12 @@ def add_eager_nodes(graph, use_dgsh_tee):
for edge_id in eager_input_ids:
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee)

## Add an eager after remote_pipe
if(isinstance(curr, remote_pipe.RemotePipe) and curr.is_remote_read()):
eager_input_ids = curr.outputs
for edge_id in eager_input_ids:
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee)

return graph


Expand Down

0 comments on commit 494a436

Please sign in to comment.