Skip to content

Commit

Permalink
v0.10.2-release: new macro submit and method fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
cihga39871 committed Sep 10, 2024
1 parent df6fdcc commit 8cbf5cf
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobSchedulers"
uuid = "eeff360b-c02d-44d3-ab26-4013c616a17e"
authors = ["Jiacheng Chuan <[email protected]>"]
version = "0.10.1"
version = "0.10.2"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
2 changes: 2 additions & 0 deletions docs/src/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}()
```@docs
Job
submit!
@submit
cancel!
result
fetch(::Job)
isqueuing
isrunning
isdone
Expand Down
5 changes: 3 additions & 2 deletions docs/src/changelog.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Changelog

TODO
v0.10.2

- Feat: `@submit!` like syntax to wrap like a normal Julia code.
- Feat: new macro `@submit` to create a job using an expression. It will automatically add explictly referred `Job` dependencies by walking through the symbols in the expression.
- Feat: new method `fetch`.

v0.10.1

Expand Down
64 changes: 54 additions & 10 deletions docs/src/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using JobSchedulers

## Create a Job

A `Job` is the wrapper of `AbstractCmd`, `Function` or `Task`:
A [`Job`](@ref) is the wrapper of `AbstractCmd`, `Function` or `Task`:

```julia
command_job = Job(
Expand Down Expand Up @@ -62,7 +62,7 @@ job_with_args = Job(
# stderr → nothing
```

> `dependency` argument in `Job(...)` controls when to start a job.
> `dependency` argument in [`Job`](@ref) controls when to start a job.
>
> It is a vector with element `STATE => job` or `STATE => job.id`.
>
Expand All @@ -80,28 +80,61 @@ Submit a job to queue:
submit!(command_job)
submit!(task_job)
submit!(job_with_args)
```

> Details: [`submit!`](@ref)
## Create and submit a Job

`submit!(Job(...))` can be simplified to `submit!(...)` from v0.8.

# submit!(Job(...)) can be simplified to submit!(...) (from v0.8)
```julia
job = submit!(@task(println("job")), priority = 0)
```

## Cancel a Job
Macro `@submit [args...] expression` is available from v0.10.2. It will automatically add **explictly referred** `Job` dependencies by walking through the symbols in the `expression`.

```julia
job = @submit ncpu=1 1+1

job_auto_dependency = @submit 1 + result(job)
# equivalent to submit!(() -> 1 + result(job); dependency=job)
```

Cancel or interrupt a job:
[`@submit`](@ref) supports any type of `Expr`ession, including a code block:

```julia
cancel!(command_job)
x = 5
job_block = @submit begin
y = x + 1
y^2
end
@assert fetch(job_block) == (5+1)^2
```

## Get a Job's Result

Get the returned result:
Get the returned [`result`](@ref) imediately. If job is not finished, show a warning message and return nothing:

```julia
result(job_with_args)
result(job)
# "result"
```

You can also use [`fetch`](@ref) to wait for job to finish and return its result from JobSchedulers v0.10.2.

```julia
fetch(job)
```

## Cancel a Job

Interrupt or [`cancel!`](@ref) a job:

```julia
cancel!(job)
```

## Recurring/repetitive Job

From JobSchedulers v0.8, users can submit recurring jobs using Linux-based **Crontab**-like methods.
Expand Down Expand Up @@ -173,6 +206,8 @@ end
# 2024-03-27T13:14:15.044
```

> Details: [`Cron`](@ref)
## Queue

Show all jobs:
Expand Down Expand Up @@ -233,7 +268,7 @@ queue(r"job.*")
# 9 columns omitted
```

See more at [`queue`](@ref), and [`all_queue`](@ref).
> See more at [`queue`](@ref), and [`all_queue`](@ref).
## Job query

Expand Down Expand Up @@ -264,9 +299,12 @@ queue(:all)[1]
# stderr → nothing
```

> See more at [`job_query`](@ref), and [`queue`](@ref).

## Wait for jobs and progress meter

Wait for a specific job(s):
[`wait`](@ref) for a specific job(s):

```julia
wait(j::Job)
Expand Down Expand Up @@ -426,6 +464,12 @@ result(program_job)
# (true, Dict{String, Any}("OUT" => "out"))
```

[`@submit`](@ref) also works with `Program`s:

```julia
program_job2 = @submit IN1=`in1` IN2=2 OUT="out" touch_run_id_file=false p
```

## Scheduler settings

Check the current status of scheduler:
Expand Down
38 changes: 21 additions & 17 deletions docs/src/use_cases.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ end

experiments_threads(dists, lens, 1000)
@time experiments_threads(dists, lens, 10000)
# 7.095762 seconds (746 allocations: 110.828 KiB)
# 6.932880 seconds (746 allocations: 110.828 KiB)
# 6.868636 seconds (751 allocations: 110.906 KiB)
# 7.023086 seconds (746 allocations: 110.828 KiB)
```

Note that `DataFrames.push!` is not a thread safe operation and hence we need to utilize a locking mechanism in order to avoid two threads appending the DataFrame at the same time.
Expand All @@ -63,38 +65,39 @@ function experiments_jobschedulers(dists, lens, K=1000)
res = DataFrame()
for T in dists
dist = T()
σ = submit!(() -> std(dist))
σ = @submit std(dist)
for L in lens
z = submit!(() -> f(dist, L, K, result(σ)), dependency=σ)
z = @submit f(dist, L, K, result(σ))
push!(res, (;T, σ, L, z))
end
end
wait_queue()
res.z = result.(res.z)
res.σ = result.(res.σ)
res.z = fetch.(res.z)
res.σ = fetch.(res.σ)
res
end

experiments_jobschedulers(dists, lens, 1000)
@time experiments_jobschedulers(dists, lens, 10000)
# 3.851350 seconds (16.26 M allocations: 432.789 MiB, 4.50% gc time)
# 3.682429 seconds (4.68 k allocations: 268.984 KiB)
# 3.687437 seconds (4.77 k allocations: 270.609 KiB)
# 3.755103 seconds (4.74 k allocations: 269.812 KiB)
```

In this code we have job interdependence. Firstly, we are calculating the standard deviation `σ`, and then we are using that value in the function `f`. Here, `submit!` wraps a task or a 0-argument function. Since `submit!` yields a `Job` rather than actual values, we need to use the `result` function to obtain those values. Because computing `z` requires completion of `σ`, we need to add argument `dependency=σ` to `submit!`. In the last, after all jobs are submitted, we use `wait_queue()` to wait for all jobs to be finished.
In this code we have job interdependence. Firstly, we are calculating the standard deviation `σ`, and then we are using that value in the function `f`. Here, `submit!` wraps a task or a 0-argument function. Since `submit!` yields a `Job` rather than actual values, we need to use the `result` function to obtain those values. Because computing `z` requires completion of `σ`, we need to add argument `dependency=σ` to `submit!`. In the last, after all jobs are submitted, we use `fetch` to wait for each job to finish and return its value.

Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the `Job` results of `submit!` serially into the DataFrame (which is fast since `submit!` doesn't block).

The above use case scenario has been tested by running `julia -t 8` (or with `JULIA_NUM_THREADS=8` as environment variable). The `Threads.@threads` code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.9 seconds, resulting in a 1.8x speedup.
The above use case scenario has been tested by running `julia -t 8` (or with `JULIA_NUM_THREADS=8` as environment variable). The `Threads.@threads` code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.7 seconds, resulting in a 1.8x speedup. To be noted, unlike `Base.Threads`, JobSchedulers only use `nthreads() - 1 = 7` threads to compute jobs, so the real speedup is `1.8 * 8/7 = 2.1`x.

!!! info "Citation"
Parallel Nested Loops was copied and edited from Dagger.jl's document. Most information are the same, except that JobSchedulers.jl was used.

## A Workflow Example With Pipelines.jl

- Run prog_A with 2 threads and 4GB RAM.
- Run prog_B with 8 threads.
- After prog_A finished, run prog_C (2 threads).
- After prog_B and prog_C finished, run prog_D (12 threads)
- Run `prog_A` with 2 threads and 4GB RAM.
- Run `prog_B` with 8 threads.
- After `prog_A` finished, run `prog_C` (2 threads).
- After `prog_B` and `prog_C` finished, run `prog_D` (12 threads)

The flowchart is like:

Expand Down Expand Up @@ -203,7 +206,8 @@ experiments_dagger(1, 10)

- JobSchedulers is stable even scheduling 100,000 tasks, while Dagger seems encountered a dead lock causing system hung.

| Number of Task | Base.Threads | JobSchedulers.jl | Dagger.jl |
|-:|-:|-:|-:|
|10,000| 0.000176 s | 0.010370 s | 0.846881 s |
|100,000| 0.001873 s | 0.247005 s | failed, hung (dead lock) |
| Number of Task | 10,000 | 100,000 |
| :---- | ----: | ----: |
| Base.Threads (second) | 0.000176 | 0.001873 |
| JobSchedulers (second) | 0.010370 | 0.247005 |
| Dagger (second) | 0.846881 | failed, hung (dead lock) |
4 changes: 2 additions & 2 deletions src/JobSchedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ include("progress_computing.jl")
include("progress_view.jl")
export queue_progress

# include("macro.jl")
# export @submit!
include("macro.jl")
export @submit

function __init__()
# Fixing precompilation hangs due to open tasks or IO
Expand Down
13 changes: 12 additions & 1 deletion src/jobs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ end
!!! note
Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different `Tasks` simultaneously (multi-threading).
See also [`@submit!`](@ref), [`submit!`](@ref), [`Cron`](@ref)
See also [`submit!`](@ref), [`@submit`](@ref), [`Cron`](@ref)
"""
mutable struct Job
id::Int64
Expand Down Expand Up @@ -232,6 +232,17 @@ function Job(command::Base.AbstractCmd;
job
end

"""
fetch(x::Job)
Wait for a `Job` to finish, then return its result value. If the task fails with an exception, a `TaskFailedException` (which wraps the failed task) is thrown.
!!! compat
`fetch(x::Job)` is available from JobSchedulers v0.10.2.
"""
function Base.fetch(x::Job)
fetch(x.task)
end

period2datetime(t::DateTime) = t
period2datetime(t::Period) = now() + t
Expand Down
Loading

0 comments on commit 8cbf5cf

Please sign in to comment.