diff --git a/DESCRIPTION b/DESCRIPTION index b9a32b8e..a2702763 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally The methodology in this package borrows from GNU 'Make' (2015, ISBN:978-9881443519) and 'drake' (2018, ). -Version: 1.4.1.9002 +Version: 1.4.1.9003 License: MIT + file LICENSE URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets BugReports: https://github.com/ropensci/targets/issues diff --git a/NAMESPACE b/NAMESPACE index 89f657c8..f21f58e9 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -548,6 +548,7 @@ importFrom(igraph,topo_sort) importFrom(knitr,engine_output) importFrom(knitr,knit_engines) importFrom(ps,ps_create_time) +importFrom(ps,ps_handle) importFrom(rlang,abort) importFrom(rlang,as_function) importFrom(rlang,check_installed) diff --git a/NEWS.md b/NEWS.md index 125b958b..ddf302e0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,21 +1,22 @@ -# targets 1.4.1.9002 (development) +# targets 1.4.1.9003 (development) ## Invalidating changes Because of the changes below, upgrading to this version of `targets` will unavoidably invalidate previously built targets in existing pipelines. Your pipeline code should still work, but any targets you ran before will most likely need to rerun after the upgrade. * In `tar_seed_create()`, use `secretbase::sha3(x = TARGET_NAME, bits = 32L, convert = NA)` to generate target seeds that are more resistant to overlapping RNG streams (#1139, @shikokuchuo). The previous approach used a less rigorous combination of `digest::digest(algo = "sha512")` and `digets::digest2int()`. -* Remove `pkgload::load_all()` warning (#1218). Tried using `.__DEVTOOLS__` but it interferes with reverse dependencies. -* Do not dispatch targets to backlogged `crew` controllers (or controller groups) (#1220). Use the new `push_backlog()` and `pop_backlog()` `crew` methods to make this smooth. -* Make the debugger message more generic (#1223, @eliocamp). -## Other changes +## Other improvements * Update the documentation of the `deployment` argument of `tar_target()` to reflect the advent of `crew` (#1208, @psychelzh). * Unset `cli.num_colors` on exit in `tar_error()` and `tar_warning()` (#1210, @dipterix). * Do not try to access `seconds_timeout` if the `crew` controller is actually a controller group (#1207, https://github.com/wlandau/crew.cluster/discussions/35, @stemangiola, @drejom). * `tar_make()` gains an `as_job` argument to optionally run a `targets` pipeline as an RStudio job. * Bump required `igraph` version to 2.0.0 because `igraph::get.edgelist()` was deprecated in favor of `igraph::as_edgelist()`. +* Do not dispatch targets to backlogged `crew` controllers (or controller groups) (#1220). Use the new `push_backlog()` and `pop_backlog()` `crew` methods to make this smooth. +* Make the debugger message more generic (#1223, @eliocamp). +* Throw an early and informative error from `tar_make()` if there is already a `targets` pipeline running on a local process on the same local data store. The local process is detected using the process ID and time stamp from `tar_process()` (with a 1.01-second tolerance for the time stamp). +* Remove `pkgload::load_all()` warning (#1218). Tried using `.__DEVTOOLS__` but it interferes with reverse dependencies. # targets 1.4.1 diff --git a/R/class_active.R b/R/class_active.R index e5e79fe1..7f888960 100644 --- a/R/class_active.R +++ b/R/class_active.R @@ -136,6 +136,7 @@ active_class <- R6::R6Class( }, ensure_process = function() { self$process <- process_init(path_store = self$meta$store) + self$process$assert_unique() self$process$record_process() self$process$database$upload(verbose = FALSE) }, diff --git a/R/class_process.R b/R/class_process.R index 4dbac2d4..0cab2cb6 100644 --- a/R/class_process.R +++ b/R/class_process.R @@ -47,6 +47,57 @@ process_class <- R6::R6Class( self$update_process() self$write_process(self$get_process()) }, + assert_unique = function() { + # Tested in tests/interactive/test-process.R + # nocov start + if (!any(file.exists(self$database$path))) { + return() + } + old <- self$read_process() + if (!all(c("pid", "created") %in% old$name)) { + # For compatibility with older {targets}, cannot test. + return() # nocov + } + pid <- as.integer(old$value[old$name == "pid"]) + if (identical(pid, as.integer(Sys.getpid()))) { + return() + } + handle <- tryCatch( + ps::ps_handle(pid = pid), + error = function(condition) NULL + ) + if (is.null(handle)) { + return() + } + time_file <- posixct_time(old$value[old$name == "created"]) + time_ps <- ps::ps_create_time(p = handle) + if (anyNA(time_file) || anyNA(time_ps)) { + return() + } + diff <- abs(difftime(time_file, time_ps, units = "secs")) + tolerance <- as.difftime(1.01, units = "secs") + tar_assert_ge( + x = diff, + threshold = tolerance, + msg = paste( + "Process ID", + pid, + "is already running a {targets} pipeline with the", + dirname(dirname(self$database$path)), + "folder as the local data store for data and metadata files.", + "Please do not attempt to run more than one pipeline on the same", + "data store because it will mangle thos important local files.", + "Before trying again, check that process", + pid, + "is really a {targets} pipeline and not a false positive,", + "then terminate it manually. In case of a false positive,", + "remove file", + self$database$path, + "and try again." + ) + ) + # nocov end + }, validate = function() { self$database$validate() } diff --git a/R/tar_package.R b/R/tar_package.R index 3a88bdcd..98c40c6c 100644 --- a/R/tar_package.R +++ b/R/tar_package.R @@ -23,7 +23,7 @@ #' graph_from_data_frame igraph_opt igraph_options is_dag simplify topo_sort #' V #' @importFrom knitr engine_output knit_engines -#' @importFrom ps ps_create_time +#' @importFrom ps ps_create_time ps_handle #' @importFrom R6 R6Class #' @importFrom rlang abort as_function check_installed enquo inform #' is_installed quo_squash warn diff --git a/tests/interactive/test-process.R b/tests/interactive/test-process.R new file mode 100644 index 00000000..9c79330f --- /dev/null +++ b/tests/interactive/test-process.R @@ -0,0 +1,25 @@ +tar_test("error running two pipelines on _targets/ at the same time", { + tar_script(tar_target(x, Sys.sleep(120))) + process <- tar_make(callr_function = callr::r_bg) + on.exit(process$kill()) + Sys.sleep(5) + tar_script(tar_target(x, TRUE)) + for (index in seq_len(2L)) { + expect_error( + tar_make(callr_function = NULL), + class = "tar_condition_validate" + ) + } + temp <- tempfile() + on.exit(unlink(temp, recursive = TRUE), add = TRUE) + tar_make(callr_function = NULL, store = temp) + expect_true(tar_read(x, store = temp)) + process$kill() + Sys.sleep(5) + tar_script(tar_target(x, "x")) + tar_make(callr_function = NULL) + expect_equal(tar_read(x), "x") + tar_script(tar_target(x, FALSE)) + tar_make(callr_function = NULL) + expect_false(tar_read(x)) +})