From 8cbf5cfdbc705768e3bb52b789f26493a7b4ecff Mon Sep 17 00:00:00 2001 From: Jiacheng Chuan Date: Tue, 10 Sep 2024 16:50:40 -0300 Subject: [PATCH] v0.10.2-release: new macro submit and method fetch --- Project.toml | 2 +- docs/src/API.md | 2 + docs/src/changelog.md | 5 +- docs/src/manual.md | 64 +++++++++++++--- docs/src/use_cases.md | 38 +++++----- src/JobSchedulers.jl | 4 +- src/jobs.jl | 13 +++- src/macro.jl | 163 +++++++++++++++++++++++++++-------------- test/benchmark_only.jl | 5 +- test/runtests.jl | 6 +- test/test_macro.jl | 26 +++---- 11 files changed, 224 insertions(+), 104 deletions(-) diff --git a/Project.toml b/Project.toml index 651940f..17e0d4d 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobSchedulers" uuid = "eeff360b-c02d-44d3-ab26-4013c616a17e" authors = ["Jiacheng Chuan "] -version = "0.10.1" +version = "0.10.2" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/docs/src/API.md b/docs/src/API.md index 0773440..9e7db82 100644 --- a/docs/src/API.md +++ b/docs/src/API.md @@ -26,8 +26,10 @@ const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}() ```@docs Job submit! +@submit cancel! result +fetch(::Job) isqueuing isrunning isdone diff --git a/docs/src/changelog.md b/docs/src/changelog.md index d180bae..4dbfaee 100644 --- a/docs/src/changelog.md +++ b/docs/src/changelog.md @@ -1,8 +1,9 @@ # Changelog -TODO +v0.10.2 -- Feat: `@submit!` like syntax to wrap like a normal Julia code. +- Feat: new macro `@submit` to create a job using an expression. It will automatically add explictly referred `Job` dependencies by walking through the symbols in the expression. +- Feat: new method `fetch`. v0.10.1 diff --git a/docs/src/manual.md b/docs/src/manual.md index c6a5d93..8ed2921 100644 --- a/docs/src/manual.md +++ b/docs/src/manual.md @@ -12,7 +12,7 @@ using JobSchedulers ## Create a Job -A `Job` is the wrapper of `AbstractCmd`, `Function` or `Task`: +A [`Job`](@ref) is the wrapper of `AbstractCmd`, `Function` or `Task`: ```julia command_job = Job( @@ -62,7 +62,7 @@ job_with_args = Job( # stderr → nothing ``` -> `dependency` argument in `Job(...)` controls when to start a job. +> `dependency` argument in [`Job`](@ref) controls when to start a job. > > It is a vector with element `STATE => job` or `STATE => job.id`. > @@ -80,28 +80,61 @@ Submit a job to queue: submit!(command_job) submit!(task_job) submit!(job_with_args) +``` + +> Details: [`submit!`](@ref) + +## Create and submit a Job + +`submit!(Job(...))` can be simplified to `submit!(...)` from v0.8. -# submit!(Job(...)) can be simplified to submit!(...) (from v0.8) +```julia job = submit!(@task(println("job")), priority = 0) ``` -## Cancel a Job +Macro `@submit [args...] expression` is available from v0.10.2. It will automatically add **explictly referred** `Job` dependencies by walking through the symbols in the `expression`. + +```julia +job = @submit ncpu=1 1+1 + +job_auto_dependency = @submit 1 + result(job) +# equivalent to submit!(() -> 1 + result(job); dependency=job) +``` -Cancel or interrupt a job: +[`@submit`](@ref) supports any type of `Expr`ession, including a code block: ```julia -cancel!(command_job) +x = 5 +job_block = @submit begin + y = x + 1 + y^2 +end +@assert fetch(job_block) == (5+1)^2 ``` ## Get a Job's Result -Get the returned result: +Get the returned [`result`](@ref) imediately. If job is not finished, show a warning message and return nothing: ```julia -result(job_with_args) +result(job) # "result" ``` +You can also use [`fetch`](@ref) to wait for job to finish and return its result from JobSchedulers v0.10.2. + +```julia +fetch(job) +``` + +## Cancel a Job + +Interrupt or [`cancel!`](@ref) a job: + +```julia +cancel!(job) +``` + ## Recurring/repetitive Job From JobSchedulers v0.8, users can submit recurring jobs using Linux-based **Crontab**-like methods. @@ -173,6 +206,8 @@ end # 2024-03-27T13:14:15.044 ``` +> Details: [`Cron`](@ref) + ## Queue Show all jobs: @@ -233,7 +268,7 @@ queue(r"job.*") # 9 columns omitted ``` -See more at [`queue`](@ref), and [`all_queue`](@ref). +> See more at [`queue`](@ref), and [`all_queue`](@ref). ## Job query @@ -264,9 +299,12 @@ queue(:all)[1] # stderr → nothing ``` +> See more at [`job_query`](@ref), and [`queue`](@ref). + + ## Wait for jobs and progress meter -Wait for a specific job(s): +[`wait`](@ref) for a specific job(s): ```julia wait(j::Job) @@ -426,6 +464,12 @@ result(program_job) # (true, Dict{String, Any}("OUT" => "out")) ``` +[`@submit`](@ref) also works with `Program`s: + +```julia +program_job2 = @submit IN1=`in1` IN2=2 OUT="out" touch_run_id_file=false p +``` + ## Scheduler settings Check the current status of scheduler: diff --git a/docs/src/use_cases.md b/docs/src/use_cases.md index ac6a82d..5ac955d 100644 --- a/docs/src/use_cases.md +++ b/docs/src/use_cases.md @@ -49,7 +49,9 @@ end experiments_threads(dists, lens, 1000) @time experiments_threads(dists, lens, 10000) -# 7.095762 seconds (746 allocations: 110.828 KiB) +# 6.932880 seconds (746 allocations: 110.828 KiB) +# 6.868636 seconds (751 allocations: 110.906 KiB) +# 7.023086 seconds (746 allocations: 110.828 KiB) ``` Note that `DataFrames.push!` is not a thread safe operation and hence we need to utilize a locking mechanism in order to avoid two threads appending the DataFrame at the same time. @@ -63,38 +65,39 @@ function experiments_jobschedulers(dists, lens, K=1000) res = DataFrame() for T in dists dist = T() - σ = submit!(() -> std(dist)) + σ = @submit std(dist) for L in lens - z = submit!(() -> f(dist, L, K, result(σ)), dependency=σ) + z = @submit f(dist, L, K, result(σ)) push!(res, (;T, σ, L, z)) end end - wait_queue() - res.z = result.(res.z) - res.σ = result.(res.σ) + res.z = fetch.(res.z) + res.σ = fetch.(res.σ) res end experiments_jobschedulers(dists, lens, 1000) @time experiments_jobschedulers(dists, lens, 10000) -# 3.851350 seconds (16.26 M allocations: 432.789 MiB, 4.50% gc time) +# 3.682429 seconds (4.68 k allocations: 268.984 KiB) +# 3.687437 seconds (4.77 k allocations: 270.609 KiB) +# 3.755103 seconds (4.74 k allocations: 269.812 KiB) ``` -In this code we have job interdependence. Firstly, we are calculating the standard deviation `σ`, and then we are using that value in the function `f`. Here, `submit!` wraps a task or a 0-argument function. Since `submit!` yields a `Job` rather than actual values, we need to use the `result` function to obtain those values. Because computing `z` requires completion of `σ`, we need to add argument `dependency=σ` to `submit!`. In the last, after all jobs are submitted, we use `wait_queue()` to wait for all jobs to be finished. +In this code we have job interdependence. Firstly, we are calculating the standard deviation `σ`, and then we are using that value in the function `f`. Here, `submit!` wraps a task or a 0-argument function. Since `submit!` yields a `Job` rather than actual values, we need to use the `result` function to obtain those values. Because computing `z` requires completion of `σ`, we need to add argument `dependency=σ` to `submit!`. In the last, after all jobs are submitted, we use `fetch` to wait for each job to finish and return its value. Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the `Job` results of `submit!` serially into the DataFrame (which is fast since `submit!` doesn't block). -The above use case scenario has been tested by running `julia -t 8` (or with `JULIA_NUM_THREADS=8` as environment variable). The `Threads.@threads` code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.9 seconds, resulting in a 1.8x speedup. +The above use case scenario has been tested by running `julia -t 8` (or with `JULIA_NUM_THREADS=8` as environment variable). The `Threads.@threads` code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.7 seconds, resulting in a 1.8x speedup. To be noted, unlike `Base.Threads`, JobSchedulers only use `nthreads() - 1 = 7` threads to compute jobs, so the real speedup is `1.8 * 8/7 = 2.1`x. !!! info "Citation" Parallel Nested Loops was copied and edited from Dagger.jl's document. Most information are the same, except that JobSchedulers.jl was used. ## A Workflow Example With Pipelines.jl -- Run prog_A with 2 threads and 4GB RAM. -- Run prog_B with 8 threads. -- After prog_A finished, run prog_C (2 threads). -- After prog_B and prog_C finished, run prog_D (12 threads) +- Run `prog_A` with 2 threads and 4GB RAM. +- Run `prog_B` with 8 threads. +- After `prog_A` finished, run `prog_C` (2 threads). +- After `prog_B` and `prog_C` finished, run `prog_D` (12 threads) The flowchart is like: @@ -203,7 +206,8 @@ experiments_dagger(1, 10) - JobSchedulers is stable even scheduling 100,000 tasks, while Dagger seems encountered a dead lock causing system hung. -| Number of Task | Base.Threads | JobSchedulers.jl | Dagger.jl | -|-:|-:|-:|-:| -|10,000| 0.000176 s | 0.010370 s | 0.846881 s | -|100,000| 0.001873 s | 0.247005 s | failed, hung (dead lock) | +| Number of Task | 10,000 | 100,000 | +| :---- | ----: | ----: | +| Base.Threads (second) | 0.000176 | 0.001873 | +| JobSchedulers (second) | 0.010370 | 0.247005 | +| Dagger (second) | 0.846881 | failed, hung (dead lock) | diff --git a/src/JobSchedulers.jl b/src/JobSchedulers.jl index 3042937..4d12a7d 100644 --- a/src/JobSchedulers.jl +++ b/src/JobSchedulers.jl @@ -61,8 +61,8 @@ include("progress_computing.jl") include("progress_view.jl") export queue_progress -# include("macro.jl") -# export @submit! +include("macro.jl") +export @submit function __init__() # Fixing precompilation hangs due to open tasks or IO diff --git a/src/jobs.jl b/src/jobs.jl index e9433a7..da49953 100644 --- a/src/jobs.jl +++ b/src/jobs.jl @@ -52,7 +52,7 @@ end !!! note Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different `Tasks` simultaneously (multi-threading). -See also [`@submit!`](@ref), [`submit!`](@ref), [`Cron`](@ref) +See also [`submit!`](@ref), [`@submit`](@ref), [`Cron`](@ref) """ mutable struct Job id::Int64 @@ -232,6 +232,17 @@ function Job(command::Base.AbstractCmd; job end +""" + fetch(x::Job) + +Wait for a `Job` to finish, then return its result value. If the task fails with an exception, a `TaskFailedException` (which wraps the failed task) is thrown. + +!!! compat + `fetch(x::Job)` is available from JobSchedulers v0.10.2. +""" +function Base.fetch(x::Job) + fetch(x.task) +end period2datetime(t::DateTime) = t period2datetime(t::Period) = now() + t diff --git a/src/macro.jl b/src/macro.jl index e70a391..55da799 100644 --- a/src/macro.jl +++ b/src/macro.jl @@ -1,23 +1,52 @@ + """ - @submit! [option=value]... expr @submit [option=value]... expr Submit a job from `expr`. If a `Job` is **explicitly** shown in `expr`, `DONE => job` will be automatically added to the dependency list. -- `option = value`: kwargs of [`Job`](@ref). +- `expr`: any type of `Expr`ession is supported. + +- `option = value`: kwargs of [`Job`](@ref). If `expr` is parsed to be a `Pipelines.Program`, `option`s also include its inputs, outputs and run kwargs. + +See also [`Job`](@ref), [`submit!`](@ref) + +## Example + +```julia +j = @submit 1+1 +wait(j) +@assert result(j) == 2 + +# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`: +j_2sec = @submit name = "run after 2 sec" begin sleep(2); 32 end + +# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`, and the `j_2sec` in the begin-end block is converted to `result(j_2sec)`: +j2 = @submit mem=2KB begin + 1 + result(j_2sec) +end -!!! warn "Only explicit jobs can be automatically added to dependency" - If a job is in a container, `@submit!` cannot know the elements in the container: +wait(j2) +@assert result(j2) == 1 + 32 + +# you can also manually add dependencies that not in the `expr`: +j3 = @submit dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2)) + +# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list. +``` + + +!!! warn "Known Issues: Only explicit jobs can be automatically added to dependency" + If a job is in a container, `@submit` cannot know the elements in the container: ```julia - j = @submit! 666 + j = @submit 666 jobs = Job[] # the job container for i in 1:10 - push!(jobs, @submit! begin sleep(2);i end) # 10 jobs will be added to `jobs` + push!(jobs, @submit begin sleep(30);i end) # 10 jobs will be added to `jobs` end x = 0 - j_something_wrong = @submit! for j in jobs + j_something_wrong = @submit for j in jobs # have to use global x global x += result(j) end @@ -27,6 +56,9 @@ Submit a job from `expr`. If a `Job` is **explicitly** shown in `expr`, `DONE => result(j_something_wrong) # MethodError(+, (nothing, nothing), 0x0000000000007b16) + # To avoid it, we can + # (1) use `submit!`, or + # (2) explicitly add `dependency = jobs` to `@submit`. x = 0 j_ok = submit!(dependency = jobs) do for j in jobs @@ -38,7 +70,7 @@ Submit a job from `expr`. If a `Job` is **explicitly** shown in `expr`, `DONE => @assert x == 55 x = 100 - j_ok_too = @submit! dependency = jobs for j in jobs + j_ok_too = @submit dependency = jobs for j in jobs # have to use global x global x += result(j) end @@ -46,61 +78,72 @@ Submit a job from `expr`. If a `Job` is **explicitly** shown in `expr`, `DONE => @assert x == 155 ``` -See also [`Job`](@ref), [`submit!`](@ref) - -## Example - -```julia -j = @submit! 1+1 -wait(j) -@assert result(j) == 2 - -# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`: -j_2sec = @submit! name = "run after 2 sec" begin sleep(2); 32 end - -# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`, and the `j_2sec` in the begin-end block is converted to `result(j_2sec)`: -j2 = @submit! mem=2KB begin - 1 + result(j_2sec) -end - -wait(j2) -@assert result(j2) == 1 + 32 - -# you can also manually add dependencies that not in the `expr`: -j3 = @submit! dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2)) - -# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list. -``` """ -macro submit!(args...) +macro submit(args...) local opts = args[1:end-1] local expr = args[end] # construct Job(...) - local params = opt2parameters(opts) # can esc + local params = opt2parameters(opts) # construct new_deps = [], and add deps in new_deps local possible_deps = sym_list(expr) # Symbol[all symbols] - unique!(possible_deps) local dep_struct = Expr(:ref, :Any, possible_deps...) # Any[possible_deps...] + # if expr is only a Symbol: it could be a Program + local expr_is_a_symbol = expr isa Symbol + return quote - local job = Job($(esc(params)), () -> $(esc(expr))) - local deps = $(esc(dep_struct)) - filter!(isajob, deps) - - if !isempty(deps) - for dep in deps - push!(job.dependency, :done => dep) - end + local job + if $expr_is_a_symbol # could be a program + local evaluated = $(esc(expr)) + if evaluated isa Program + job = Job($(esc(params)), evaluated) + @goto submit + else + @goto normal + end + else + @label normal + local deps = $(esc(dep_struct)) + filter!(isajob, deps) + job = Job($(esc(params)), () -> $(esc(expr))) + + if !isempty(deps) + for dep in deps + push!(job.dependency, :done => dep) + end + end + @label submit + submit!(job) end - @info job.dependency - submit!(job) end end +# function job_to_result!(expr::Expr, dep_syms::Vector{Symbol}) +# for (i,a) in enumerate(expr.args) +# @inbounds expr.args[i] = job_to_result!(a, dep_syms) +# end +# expr +# end +# function job_to_result!(expr::Symbol, dep_syms::Vector{Symbol}) +# if expr in dep_syms +# :(JobSchedulers.result_or_self($expr)) +# else +# expr +# end +# end +# function job_to_result!(expr, dep_syms::Vector{Symbol}) +# expr +# end + isajob(x::Job) = true isajob(x) = false +isnotajob(x::Job) = false +isnotajob(x) = true + +# result_or_self(x::Job) = result(x) +# result_or_self(x) = result(x) function opt2parameters(opts::NTuple{N, Expr}) where N for opt in opts @@ -114,20 +157,34 @@ end """ sym_list(x) -Extract var (symbols) in `var isa Job`. +Extract var (symbols) in expression/symbol. """ function sym_list(x) v = Vector{Symbol}() - sym_list!(v, x) + just_defined = Set() + _sym_list!(v, just_defined, x) + unique!(v) + filter!(v) do x + !(x in just_defined) + end v end -sym_list!(v::Vector, expr::Expr) = sym_list!(v, expr.args) -sym_list!(v::Vector, sym::Symbol) = push!(v, sym) -sym_list!(v::Vector, x) = nothing +function _sym_list!(v::Vector, just_defined::Set, expr::Expr) + if expr.head == :(=) && length(expr.args) == 2 + push!(just_defined, expr.args[1]) + # skip first arg + _sym_list!(v, just_defined, expr.args[2]) + else + _sym_list!(v, just_defined, expr.args) + end + expr +end +_sym_list!(v::Vector, just_defined::Set, sym::Symbol) = push!(v, sym) +_sym_list!(v::Vector, just_defined::Set, x) = nothing -function sym_list!(v::Vector, args::Vector) +function _sym_list!(v::Vector, just_defined::Set, args::Vector) for arg in args - sym_list!(v::Vector, arg) + _sym_list!(v::Vector, just_defined, arg) end end \ No newline at end of file diff --git a/test/benchmark_only.jl b/test/benchmark_only.jl index 735f6c7..08f0fbe 100644 --- a/test/benchmark_only.jl +++ b/test/benchmark_only.jl @@ -17,7 +17,7 @@ function experiments_dagger(a, K=10000) x = 0 f() = x += a @sync for i in 1:K - Dagger.@spawn f() + Dagger.@spawn f() # cannot use x += a because not a valid expression in Dagger end x end @@ -52,7 +52,7 @@ end function experiments_jobschedulers2(a, K=10000) x = 0 for i in 1:K - @submit! x += a + @submit x += a end wait_queue() x @@ -65,3 +65,4 @@ end @time experiments_jobschedulers2(1, 100000) # 0.254929 seconds (1.46 M allocations: 109.989 MiB, 18.08% gc time) +@sync \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 3263f0b..9f4e9d7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -295,9 +295,9 @@ using Test include("recur.jl") end - # @testset "Macro" begin - # include("test_macro.jl") - # end + @testset "Macro" begin + include("test_macro.jl") + end @test scheduler_status() === RUNNING diff --git a/test/test_macro.jl b/test/test_macro.jl index 9daeb7c..78cdf11 100644 --- a/test/test_macro.jl +++ b/test/test_macro.jl @@ -1,19 +1,19 @@ -@testset "@submit!" begin - j = @submit! name = "abc" dependency = [] 1+1 +@testset "@submit" begin + j = @submit name = "abc" dependency = [] 1+1 wait(j) @test result(j) == 2 @test j.name == "abc" a = 3242 - j = @submit! a + 5 + j = @submit a + 5 wait(j) @test result(j) == 3242 + 5 - j_long = @submit! name = "abc" dependency = j begin sleep(2); 32 end + j_long = @submit name = "abc" dependency = j begin sleep(2); 32 end - j2 = @submit! mem = 2KB begin 1 + result(j) + result(j_long) end + j2 = @submit mem = 2KB begin 1 + result(j) + result(j_long) end @test length(j2.dependency) > 0 sleep(1) @test j2.state === QUEUING @@ -23,7 +23,7 @@ function experiments_jobschedulers2(a, K=10000) x = 0 for i in 1:K - @submit! x += a + @submit x += a end wait_queue() x @@ -35,18 +35,18 @@ y = [a:a+K...] js = Job[] - # for _ in 1:K - j1 = @submit! sum(y)/length(y) + for _ in 1:K + j1 = @submit sum(y)/length(y) for i in y - j2 = @submit! (i - result(j1))^2 + j2 = @submit (i - result(j1))^2 push!(js, j2) end - # end - jsum = @submit! for j in js + end + jsum = @submit dependency=js for j in js x += result(j) end wait(jsum) - @error "jsum's dependency is outer j, which does not make sense!" - x + x/K end + @test_nowarn test_sequential(4,50) end