From 71afdaaf21e04cb364a10294360837722d6a2cd5 Mon Sep 17 00:00:00 2001 From: ottjk Date: Thu, 27 Jun 2024 16:06:35 +0200 Subject: [PATCH 1/6] Reworked datadeps for algorithm scheduling Data objects are no longer scheduled; algorithms take mutable inputs rather than returning outputs in a Thunk --- examples/schedule.jl | 12 +++--- src/scheduling.jl | 91 +++++++++++++++++++++++++++++++------------- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/examples/schedule.jl b/examples/schedule.jl index 792d2f0..e4daeb9 100644 --- a/examples/schedule.jl +++ b/examples/schedule.jl @@ -1,8 +1,10 @@ using Distributed -if abspath(PROGRAM_FILE) == @__FILE__ - new_procs = addprocs(12) # Set the number of workers -end +## doesn't work with datadeps! +# if abspath(PROGRAM_FILE) == @__FILE__ +# new_procs = addprocs(12) # Set the number of workers +# end +## using Dagger using Graphs @@ -10,7 +12,6 @@ using MetaGraphs using FrameworkDemo using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package. - # Defining constants output_dir = "results" graph1_path = "./data/sequencer_demo/df_sequencer_demo.graphml" @@ -46,7 +47,8 @@ function execution(graphs_map) results = [] for (g_name, g) in graphs g_map = Dict{Int, Any}() - for vertex_id in Graphs.vertices(g) + data_vertices = MetaGraphs.filter_vertices(g, :type, "DataObject") + for vertex_id in data_vertices future = get_prop(g, vertex_id, :res_data) g_map[vertex_id] = fetch(future) end diff --git a/src/scheduling.jl b/src/scheduling.jl index d006bdd..068f879 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -1,18 +1,50 @@ import Dagger using Distributed +using Graphs using MetaGraphs +mutable struct DataObject + data + size::Float64 +end + +function populate_data_object!(object::DataObject, data) + proc = Dagger.thunk_processor() + scope = Dagger.scope(worker=myid()) + + chunk = Dagger.tochunk(data, proc, scope) + + object.data = chunk +end + # Algorithms -function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...) - println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !") - sleep(1) - return vertex_id +function _algorithm(graph::MetaDiGraph, vertex_id::Int) + function algorithm(inputs, outputs) + println("Gaudi algorithm for vertex $vertex_id !") + + for output in outputs + populate_data_object!(output, rand(16)) + end + end + + return algorithm end -function dataobject_algorithm(graph_name, graph_id, vertex_id, data...) - println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !") - sleep(0.1) - return vertex_id +AVAILABLE_TRANSFORMS = Dict{String, Function}( + "Algorithm" => _algorithm, +) + +function get_transform(graph::MetaDiGraph, vertex_id::Int) + type = get_prop(graph, vertex_id, :type) + + function f(data...; N_inputs) + inputs = data[1:N_inputs] + outputs = data[N_inputs+1:end] + transform = AVAILABLE_TRANSFORMS[type](graph, vertex_id) + return transform(inputs, outputs) + end + + return f end function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...) @@ -82,33 +114,40 @@ function get_vertices_promises(vertices::Vector, G::MetaDiGraph) return promises end -function get_deps_promises(vertex_id, map, G) - incoming_data = [] - if haskey(map, vertex_id) - for src in map[vertex_id] - push!(incoming_data, get_prop(G, src, :res_data)) - end - end - return incoming_data +function get_in_promises(G, vertex_id) + return [get_prop(G, src, :res_data) for src in inneighbors(G, vertex_id)] +end + +function get_out_promises(G, vertex_id) + return [get_prop(G, src, :res_data) for src in outneighbors(G, vertex_id)] end function schedule_graph(G::MetaDiGraph) - inc_e_src_map = get_ine_map(G) + data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject") + sorted_vertices = MetaGraphs.topological_sort(G) + + for data_id in data_vertices + # not yet defined in example graphs + # size = get_prop(G, data_id, :size) + size = 0 + set_prop!(G, data_id, :res_data, DataObject(nothing, size)) + end - for vertex_id in MetaGraphs.topological_sort(G) - incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) - set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...)) + Dagger.spawn_datadeps() do + for vertex_id in setdiff(sorted_vertices, data_vertices) + incoming_data = get_in_promises(G, vertex_id) + outgoing_data = get_out_promises(G, vertex_id) + transform = get_transform(G, vertex_id) + N_inputs = length(incoming_data) + Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs) + end end end function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int) final_vertices = [] - inc_e_src_map = get_ine_map(G) - for vertex_id in MetaGraphs.topological_sort(G) - incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) - set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...)) - end + schedule_graph(G) out_e_src_map = get_oute_map(G) for vertex_id in MetaGraphs.vertices(G) @@ -125,5 +164,3 @@ function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...) end - -AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm) From eb8a936aba3b1ead0d1254b77fd5c400141fda9f Mon Sep 17 00:00:00 2001 From: ottjk Date: Mon, 1 Jul 2024 14:09:23 +0200 Subject: [PATCH 2/6] made algorithm sleep for average runtime --- examples/schedule.jl | 4 ++-- src/scheduling.jl | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/schedule.jl b/examples/schedule.jl index e4daeb9..eccdf88 100644 --- a/examples/schedule.jl +++ b/examples/schedule.jl @@ -1,6 +1,6 @@ using Distributed -## doesn't work with datadeps! +## doesn't work with datadeps! threads work though # if abspath(PROGRAM_FILE) == @__FILE__ # new_procs = addprocs(12) # Set the number of workers # end @@ -69,7 +69,7 @@ function main(graphs_map) # # OR # - # configure_webdash_multievent() + # FrameworkDemo.configure_webdash_multievent() @time execution(graphs_map) diff --git a/src/scheduling.jl b/src/scheduling.jl index 068f879..63c906b 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -5,7 +5,7 @@ using MetaGraphs mutable struct DataObject data - size::Float64 + size::Int end function populate_data_object!(object::DataObject, data) @@ -19,12 +19,16 @@ end # Algorithms function _algorithm(graph::MetaDiGraph, vertex_id::Int) + runtime = get_prop(graph, vertex_id, :runtime_average_s) + function algorithm(inputs, outputs) println("Gaudi algorithm for vertex $vertex_id !") for output in outputs - populate_data_object!(output, rand(16)) + populate_data_object!(output, ' '^output.size) end + + sleep(runtime) end return algorithm @@ -60,7 +64,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_ parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot" parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png" G = parse_graphml([graph_path]) - + open(parsed_graph_dot, "w") do f MetaGraphs.savedot(f, G) end @@ -77,7 +81,7 @@ function get_ine_map(G) for edge in Graphs.edges(G) src_vertex = src(edge) dest_vertex = dst(edge) - + if haskey(incoming_edges_sources_map, dest_vertex) push!(incoming_edges_sources_map[dest_vertex], src_vertex) else @@ -127,9 +131,7 @@ function schedule_graph(G::MetaDiGraph) sorted_vertices = MetaGraphs.topological_sort(G) for data_id in data_vertices - # not yet defined in example graphs - # size = get_prop(G, data_id, :size) - size = 0 + size = get_prop(G, data_id, :size) set_prop!(G, data_id, :res_data, DataObject(nothing, size)) end @@ -139,7 +141,8 @@ function schedule_graph(G::MetaDiGraph) outgoing_data = get_out_promises(G, vertex_id) transform = get_transform(G, vertex_id) N_inputs = length(incoming_data) - Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs) + res = Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs) + set_prop!(G, vertex_id, :res_data, res) end end end From dc65be422d3456b0ada6dc9acc34b246b40f43d1 Mon Sep 17 00:00:00 2001 From: ottjk Date: Tue, 2 Jul 2024 14:20:59 +0200 Subject: [PATCH 3/6] Added size and runtime properties to test graphs * added size_kb field to store size as a double * adjusted scheduling.jl treatment of size to align with test graphs --- .../sequencer_demo/another_test_graph.graphml | 120 ++++++++++-------- data/sequencer_demo/df_sequencer_demo.graphml | 1 + src/scheduling.jl | 7 +- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/data/sequencer_demo/another_test_graph.graphml b/data/sequencer_demo/another_test_graph.graphml index ace28b3..6e8ef49 100644 --- a/data/sequencer_demo/another_test_graph.graphml +++ b/data/sequencer_demo/another_test_graph.graphml @@ -1,88 +1,106 @@ - - - + + + + + - MicroProducer - ProducerA - Algorithm + MicroProducer + ProducerA + Algorithm + 9.3027e-05 - - A - DataObject + + A + DataObject + 8.0 - MicroTransformer - TransformerB - Algorithm + MicroTransformer + TransformerB + Algorithm + 4.2463e-05 - - C - DataObject + + C + DataObject + 8.0 - MicroTransformer - TransformerD - Algorithm + MicroTransformer + TransformerD + Algorithm + 8.241000000000001e-06 - - E - DataObject + + E + DataObject + 8.0 - MicroTransformer - TransformerF - Algorithm + MicroTransformer + TransformerF + Algorithm + 7.264e-06 - - G - DataObject + + G + DataObject + 8.0 - MicroTransformer - TransformerH - Algorithm + MicroTransformer + TransformerH + Algorithm + 6.705e-06 - - I - DataObject + + I + DataObject + 8.0 - MicroTransformer - TransformerJ - Algorithm + MicroTransformer + TransformerJ + Algorithm + 8.59e-06 - - K - DataObject + + K + DataObject + 8.0 - MicroTransformer - TransformerL - Algorithm + MicroTransformer + TransformerL + Algorithm + 8.94e-06 - - M - DataObject + + M + DataObject + 8.0 - MicroTransformer - TransformerN - Algorithm + MicroTransformer + TransformerN + Algorithm + 8.94e-06 - - O - DataObject + + O + DataObject + 8.0 diff --git a/data/sequencer_demo/df_sequencer_demo.graphml b/data/sequencer_demo/df_sequencer_demo.graphml index f0d499f..22e9397 100644 --- a/data/sequencer_demo/df_sequencer_demo.graphml +++ b/data/sequencer_demo/df_sequencer_demo.graphml @@ -1,5 +1,6 @@ + diff --git a/src/scheduling.jl b/src/scheduling.jl index 63c906b..929078b 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -5,7 +5,7 @@ using MetaGraphs mutable struct DataObject data - size::Int + size::Float64 end function populate_data_object!(object::DataObject, data) @@ -25,7 +25,8 @@ function _algorithm(graph::MetaDiGraph, vertex_id::Int) println("Gaudi algorithm for vertex $vertex_id !") for output in outputs - populate_data_object!(output, ' '^output.size) + bytes = round(Int, output.size * 1e3) + populate_data_object!(output, ' '^bytes) end sleep(runtime) @@ -131,7 +132,7 @@ function schedule_graph(G::MetaDiGraph) sorted_vertices = MetaGraphs.topological_sort(G) for data_id in data_vertices - size = get_prop(G, data_id, :size) + size = get_prop(G, data_id, :size_kb) set_prop!(G, data_id, :res_data, DataObject(nothing, size)) end From 234134eca3b2527d9000e09e9eb173232bc437f4 Mon Sep 17 00:00:00 2001 From: ottjk Date: Wed, 3 Jul 2024 13:44:28 +0200 Subject: [PATCH 4/6] Made graph property consistent with tests Changed size_kb to size_average_B --- data/datadeps_demo/df.graphml | 5 +++-- data/sequencer_demo/another_test_graph.graphml | 2 +- data/sequencer_demo/df_sequencer_demo.graphml | 4 ++-- src/scheduling.jl | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/data/datadeps_demo/df.graphml b/data/datadeps_demo/df.graphml index 5cdc0af..4a00968 100644 --- a/data/datadeps_demo/df.graphml +++ b/data/datadeps_demo/df.graphml @@ -1,5 +1,6 @@ - + + @@ -96,4 +97,4 @@ - \ No newline at end of file + diff --git a/data/sequencer_demo/another_test_graph.graphml b/data/sequencer_demo/another_test_graph.graphml index 6e8ef49..da01d58 100644 --- a/data/sequencer_demo/another_test_graph.graphml +++ b/data/sequencer_demo/another_test_graph.graphml @@ -4,7 +4,7 @@ - + MicroProducer diff --git a/data/sequencer_demo/df_sequencer_demo.graphml b/data/sequencer_demo/df_sequencer_demo.graphml index 22e9397..b304f1f 100644 --- a/data/sequencer_demo/df_sequencer_demo.graphml +++ b/data/sequencer_demo/df_sequencer_demo.graphml @@ -1,6 +1,6 @@ - - + + diff --git a/src/scheduling.jl b/src/scheduling.jl index 929078b..2e3a417 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -132,7 +132,7 @@ function schedule_graph(G::MetaDiGraph) sorted_vertices = MetaGraphs.topological_sort(G) for data_id in data_vertices - size = get_prop(G, data_id, :size_kb) + size = get_prop(G, data_id, :size_average_B) set_prop!(G, data_id, :res_data, DataObject(nothing, size)) end From f888b36aa4c79dc5dd4bc9f2c1c5a52fd649409f Mon Sep 17 00:00:00 2001 From: Josh Ott Date: Thu, 4 Jul 2024 13:29:08 +0200 Subject: [PATCH 5/6] Update src/scheduling.jl Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com> --- src/scheduling.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduling.jl b/src/scheduling.jl index 2e3a417..e26bdf9 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -18,7 +18,7 @@ function populate_data_object!(object::DataObject, data) end # Algorithms -function _algorithm(graph::MetaDiGraph, vertex_id::Int) +function make_algorithm(graph::MetaDiGraph, vertex_id::Int) runtime = get_prop(graph, vertex_id, :runtime_average_s) function algorithm(inputs, outputs) From e98763a249017e088cdc44ad3938b1b43742cf31 Mon Sep 17 00:00:00 2001 From: Josh Ott Date: Thu, 4 Jul 2024 13:30:30 +0200 Subject: [PATCH 6/6] remove AVAILABLE_TRANSFORMS Dict Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com> --- src/scheduling.jl | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/scheduling.jl b/src/scheduling.jl index e26bdf9..fd8ae82 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -35,9 +35,6 @@ function make_algorithm(graph::MetaDiGraph, vertex_id::Int) return algorithm end -AVAILABLE_TRANSFORMS = Dict{String, Function}( - "Algorithm" => _algorithm, -) function get_transform(graph::MetaDiGraph, vertex_id::Int) type = get_prop(graph, vertex_id, :type)