diff --git a/data/datadeps_demo/df.graphml b/data/datadeps_demo/df.graphml
index 5cdc0af..4a00968 100644
--- a/data/datadeps_demo/df.graphml
+++ b/data/datadeps_demo/df.graphml
@@ -1,5 +1,6 @@
-
+
+
@@ -96,4 +97,4 @@
-
\ No newline at end of file
+
diff --git a/data/sequencer_demo/another_test_graph.graphml b/data/sequencer_demo/another_test_graph.graphml
index ace28b3..da01d58 100644
--- a/data/sequencer_demo/another_test_graph.graphml
+++ b/data/sequencer_demo/another_test_graph.graphml
@@ -1,88 +1,106 @@
-
-
-
+
+
+
+
+
- MicroProducer
- ProducerA
- Algorithm
+ MicroProducer
+ ProducerA
+ Algorithm
+ 9.3027e-05
-
- A
- DataObject
+
+ A
+ DataObject
+ 8.0
- MicroTransformer
- TransformerB
- Algorithm
+ MicroTransformer
+ TransformerB
+ Algorithm
+ 4.2463e-05
-
- C
- DataObject
+
+ C
+ DataObject
+ 8.0
- MicroTransformer
- TransformerD
- Algorithm
+ MicroTransformer
+ TransformerD
+ Algorithm
+ 8.241000000000001e-06
-
- E
- DataObject
+
+ E
+ DataObject
+ 8.0
- MicroTransformer
- TransformerF
- Algorithm
+ MicroTransformer
+ TransformerF
+ Algorithm
+ 7.264e-06
-
- G
- DataObject
+
+ G
+ DataObject
+ 8.0
- MicroTransformer
- TransformerH
- Algorithm
+ MicroTransformer
+ TransformerH
+ Algorithm
+ 6.705e-06
-
- I
- DataObject
+
+ I
+ DataObject
+ 8.0
- MicroTransformer
- TransformerJ
- Algorithm
+ MicroTransformer
+ TransformerJ
+ Algorithm
+ 8.59e-06
-
- K
- DataObject
+
+ K
+ DataObject
+ 8.0
- MicroTransformer
- TransformerL
- Algorithm
+ MicroTransformer
+ TransformerL
+ Algorithm
+ 8.94e-06
-
- M
- DataObject
+
+ M
+ DataObject
+ 8.0
- MicroTransformer
- TransformerN
- Algorithm
+ MicroTransformer
+ TransformerN
+ Algorithm
+ 8.94e-06
-
- O
- DataObject
+
+ O
+ DataObject
+ 8.0
diff --git a/data/sequencer_demo/df_sequencer_demo.graphml b/data/sequencer_demo/df_sequencer_demo.graphml
index f0d499f..b304f1f 100644
--- a/data/sequencer_demo/df_sequencer_demo.graphml
+++ b/data/sequencer_demo/df_sequencer_demo.graphml
@@ -1,5 +1,6 @@
-
+
+
diff --git a/examples/schedule.jl b/examples/schedule.jl
index 792d2f0..eccdf88 100644
--- a/examples/schedule.jl
+++ b/examples/schedule.jl
@@ -1,8 +1,10 @@
using Distributed
-if abspath(PROGRAM_FILE) == @__FILE__
- new_procs = addprocs(12) # Set the number of workers
-end
+## doesn't work with datadeps! threads work though
+# if abspath(PROGRAM_FILE) == @__FILE__
+# new_procs = addprocs(12) # Set the number of workers
+# end
+##
using Dagger
using Graphs
@@ -10,7 +12,6 @@ using MetaGraphs
using FrameworkDemo
using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package.
-
# Defining constants
output_dir = "results"
graph1_path = "./data/sequencer_demo/df_sequencer_demo.graphml"
@@ -46,7 +47,8 @@ function execution(graphs_map)
results = []
for (g_name, g) in graphs
g_map = Dict{Int, Any}()
- for vertex_id in Graphs.vertices(g)
+ data_vertices = MetaGraphs.filter_vertices(g, :type, "DataObject")
+ for vertex_id in data_vertices
future = get_prop(g, vertex_id, :res_data)
g_map[vertex_id] = fetch(future)
end
@@ -67,7 +69,7 @@ function main(graphs_map)
#
# OR
#
- # configure_webdash_multievent()
+ # FrameworkDemo.configure_webdash_multievent()
@time execution(graphs_map)
diff --git a/src/scheduling.jl b/src/scheduling.jl
index d006bdd..fd8ae82 100644
--- a/src/scheduling.jl
+++ b/src/scheduling.jl
@@ -1,18 +1,52 @@
import Dagger
using Distributed
+using Graphs
using MetaGraphs
+mutable struct DataObject
+ data
+ size::Float64
+end
+
+function populate_data_object!(object::DataObject, data)
+ proc = Dagger.thunk_processor()
+ scope = Dagger.scope(worker=myid())
+
+ chunk = Dagger.tochunk(data, proc, scope)
+
+ object.data = chunk
+end
+
# Algorithms
-function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...)
- println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !")
- sleep(1)
- return vertex_id
+function make_algorithm(graph::MetaDiGraph, vertex_id::Int)
+ runtime = get_prop(graph, vertex_id, :runtime_average_s)
+
+ function algorithm(inputs, outputs)
+ println("Gaudi algorithm for vertex $vertex_id !")
+
+ for output in outputs
+ bytes = round(Int, output.size * 1e3)
+ populate_data_object!(output, ' '^bytes)
+ end
+
+ sleep(runtime)
+ end
+
+ return algorithm
end
-function dataobject_algorithm(graph_name, graph_id, vertex_id, data...)
- println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !")
- sleep(0.1)
- return vertex_id
+
+function get_transform(graph::MetaDiGraph, vertex_id::Int)
+ type = get_prop(graph, vertex_id, :type)
+
+ function f(data...; N_inputs)
+ inputs = data[1:N_inputs]
+ outputs = data[N_inputs+1:end]
+ transform = AVAILABLE_TRANSFORMS[type](graph, vertex_id)
+ return transform(inputs, outputs)
+ end
+
+ return f
end
function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...)
@@ -28,7 +62,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = parse_graphml([graph_path])
-
+
open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
@@ -45,7 +79,7 @@ function get_ine_map(G)
for edge in Graphs.edges(G)
src_vertex = src(edge)
dest_vertex = dst(edge)
-
+
if haskey(incoming_edges_sources_map, dest_vertex)
push!(incoming_edges_sources_map[dest_vertex], src_vertex)
else
@@ -82,33 +116,39 @@ function get_vertices_promises(vertices::Vector, G::MetaDiGraph)
return promises
end
-function get_deps_promises(vertex_id, map, G)
- incoming_data = []
- if haskey(map, vertex_id)
- for src in map[vertex_id]
- push!(incoming_data, get_prop(G, src, :res_data))
- end
- end
- return incoming_data
+function get_in_promises(G, vertex_id)
+ return [get_prop(G, src, :res_data) for src in inneighbors(G, vertex_id)]
+end
+
+function get_out_promises(G, vertex_id)
+ return [get_prop(G, src, :res_data) for src in outneighbors(G, vertex_id)]
end
function schedule_graph(G::MetaDiGraph)
- inc_e_src_map = get_ine_map(G)
+ data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject")
+ sorted_vertices = MetaGraphs.topological_sort(G)
- for vertex_id in MetaGraphs.topological_sort(G)
- incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
- set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...))
+ for data_id in data_vertices
+ size = get_prop(G, data_id, :size_average_B)
+ set_prop!(G, data_id, :res_data, DataObject(nothing, size))
+ end
+
+ Dagger.spawn_datadeps() do
+ for vertex_id in setdiff(sorted_vertices, data_vertices)
+ incoming_data = get_in_promises(G, vertex_id)
+ outgoing_data = get_out_promises(G, vertex_id)
+ transform = get_transform(G, vertex_id)
+ N_inputs = length(incoming_data)
+ res = Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs)
+ set_prop!(G, vertex_id, :res_data, res)
+ end
end
end
function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int)
final_vertices = []
- inc_e_src_map = get_ine_map(G)
- for vertex_id in MetaGraphs.topological_sort(G)
- incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
- set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...))
- end
+ schedule_graph(G)
out_e_src_map = get_oute_map(G)
for vertex_id in MetaGraphs.vertices(G)
@@ -125,5 +165,3 @@ function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel
Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...)
end
-
-AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm)