Skip to content

Commit

Permalink
Throw an informative error if two local pipelines are running at the …
Browse files Browse the repository at this point in the history
…same time on the same data store
  • Loading branch information
wlandau-lilly committed Feb 6, 2024
1 parent 6177844 commit b9642a9
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 7 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.4.1.9002
Version: 1.4.1.9003
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ importFrom(igraph,topo_sort)
importFrom(knitr,engine_output)
importFrom(knitr,knit_engines)
importFrom(ps,ps_create_time)
importFrom(ps,ps_handle)
importFrom(rlang,abort)
importFrom(rlang,as_function)
importFrom(rlang,check_installed)
Expand Down
11 changes: 6 additions & 5 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
# targets 1.4.1.9002 (development)
# targets 1.4.1.9003 (development)

## Invalidating changes

Because of the changes below, upgrading to this version of `targets` will unavoidably invalidate previously built targets in existing pipelines. Your pipeline code should still work, but any targets you ran before will most likely need to rerun after the upgrade.

* In `tar_seed_create()`, use `secretbase::sha3(x = TARGET_NAME, bits = 32L, convert = NA)` to generate target seeds that are more resistant to overlapping RNG streams (#1139, @shikokuchuo). The previous approach used a less rigorous combination of `digest::digest(algo = "sha512")` and `digets::digest2int()`.
* Remove `pkgload::load_all()` warning (#1218). Tried using `.__DEVTOOLS__` but it interferes with reverse dependencies.
* Do not dispatch targets to backlogged `crew` controllers (or controller groups) (#1220). Use the new `push_backlog()` and `pop_backlog()` `crew` methods to make this smooth.
* Make the debugger message more generic (#1223, @eliocamp).

## Other changes
## Other improvements

* Update the documentation of the `deployment` argument of `tar_target()` to reflect the advent of `crew` (#1208, @psychelzh).
* Unset `cli.num_colors` on exit in `tar_error()` and `tar_warning()` (#1210, @dipterix).
* Do not try to access `seconds_timeout` if the `crew` controller is actually a controller group (#1207, https://github.com/wlandau/crew.cluster/discussions/35, @stemangiola, @drejom).
* `tar_make()` gains an `as_job` argument to optionally run a `targets` pipeline as an RStudio job.
* Bump required `igraph` version to 2.0.0 because `igraph::get.edgelist()` was deprecated in favor of `igraph::as_edgelist()`.
* Do not dispatch targets to backlogged `crew` controllers (or controller groups) (#1220). Use the new `push_backlog()` and `pop_backlog()` `crew` methods to make this smooth.
* Make the debugger message more generic (#1223, @eliocamp).
* Throw an early and informative error from `tar_make()` if there is already a `targets` pipeline running on a local process on the same local data store. The local process is detected using the process ID and time stamp from `tar_process()` (with a 1.01-second tolerance for the time stamp).
* Remove `pkgload::load_all()` warning (#1218). Tried using `.__DEVTOOLS__` but it interferes with reverse dependencies.

# targets 1.4.1

Expand Down
1 change: 1 addition & 0 deletions R/class_active.R
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ active_class <- R6::R6Class(
},
ensure_process = function() {
self$process <- process_init(path_store = self$meta$store)
self$process$assert_unique()
self$process$record_process()
self$process$database$upload(verbose = FALSE)
},
Expand Down
51 changes: 51 additions & 0 deletions R/class_process.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,57 @@ process_class <- R6::R6Class(
self$update_process()
self$write_process(self$get_process())
},
assert_unique = function() {
# Tested in tests/interactive/test-process.R
# nocov start
if (!any(file.exists(self$database$path))) {
return()
}
old <- self$read_process()
if (!all(c("pid", "created") %in% old$name)) {
# For compatibility with older {targets}, cannot test.
return() # nocov
}
pid <- as.integer(old$value[old$name == "pid"])
if (identical(pid, as.integer(Sys.getpid()))) {
return()
}
handle <- tryCatch(
ps::ps_handle(pid = pid),
error = function(condition) NULL
)
if (is.null(handle)) {
return()
}
time_file <- posixct_time(old$value[old$name == "created"])
time_ps <- ps::ps_create_time(p = handle)
if (anyNA(time_file) || anyNA(time_ps)) {
return()
}
diff <- abs(difftime(time_file, time_ps, units = "secs"))
tolerance <- as.difftime(1.01, units = "secs")
tar_assert_ge(
x = diff,
threshold = tolerance,
msg = paste(
"Process ID",
pid,
"is already running a {targets} pipeline with the",
dirname(dirname(self$database$path)),
"folder as the local data store for data and metadata files.",
"Please do not attempt to run more than one pipeline on the same",
"data store because it will mangle thos important local files.",
"Before trying again, check that process",
pid,
"is really a {targets} pipeline and not a false positive,",
"then terminate it manually. In case of a false positive,",
"remove file",
self$database$path,
"and try again."
)
)
# nocov end
},
validate = function() {
self$database$validate()
}
Expand Down
2 changes: 1 addition & 1 deletion R/tar_package.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#' graph_from_data_frame igraph_opt igraph_options is_dag simplify topo_sort
#' V
#' @importFrom knitr engine_output knit_engines
#' @importFrom ps ps_create_time
#' @importFrom ps ps_create_time ps_handle
#' @importFrom R6 R6Class
#' @importFrom rlang abort as_function check_installed enquo inform
#' is_installed quo_squash warn
Expand Down
25 changes: 25 additions & 0 deletions tests/interactive/test-process.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
tar_test("error running two pipelines on _targets/ at the same time", {
tar_script(tar_target(x, Sys.sleep(120)))
process <- tar_make(callr_function = callr::r_bg)
on.exit(process$kill())
Sys.sleep(5)
tar_script(tar_target(x, TRUE))
for (index in seq_len(2L)) {
expect_error(
tar_make(callr_function = NULL),
class = "tar_condition_validate"
)
}
temp <- tempfile()
on.exit(unlink(temp, recursive = TRUE), add = TRUE)
tar_make(callr_function = NULL, store = temp)
expect_true(tar_read(x, store = temp))
process$kill()
Sys.sleep(5)
tar_script(tar_target(x, "x"))
tar_make(callr_function = NULL)
expect_equal(tar_read(x), "x")
tar_script(tar_target(x, FALSE))
tar_make(callr_function = NULL)
expect_false(tar_read(x))
})

0 comments on commit b9642a9

Please sign in to comment.