diff --git a/DESCRIPTION b/DESCRIPTION index 29609eb4..cc39eff1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -76,7 +76,7 @@ Suggests: arrow (>= 3.0.0), bs4Dash (>= 2.0.0), clustermq (>= 0.9.1), - crew (>= 0.6.0), + crew (>= 0.6.0.9004), curl (>= 4.3), DT (>= 0.14), dplyr (>= 1.0.0), diff --git a/R/class_crew.R b/R/class_crew.R index 2c0740cd..19ad3cce 100644 --- a/R/class_crew.R +++ b/R/class_crew.R @@ -8,6 +8,7 @@ crew_init <- function( seconds_meta_append = 0, seconds_meta_upload = 15, seconds_reporter = 0, + seconds_scale = 0.5, garbage_collection = FALSE, envir = tar_option_get("envir"), controller = NULL, @@ -23,6 +24,7 @@ crew_init <- function( seconds_meta_append = seconds_meta_append, seconds_meta_upload = seconds_meta_upload, seconds_reporter = seconds_reporter, + seconds_scale = seconds_scale, garbage_collection = garbage_collection, envir = envir, controller = controller, @@ -40,6 +42,7 @@ crew_new <- function( seconds_meta_append = NULL, seconds_meta_upload = NULL, seconds_reporter = NULL, + seconds_scale = NULL, garbage_collection = NULL, envir = NULL, controller = NULL, @@ -55,6 +58,7 @@ crew_new <- function( seconds_meta_append = seconds_meta_append, seconds_meta_upload = seconds_meta_upload, seconds_reporter = seconds_reporter, + seconds_scale = seconds_scale, garbage_collection = garbage_collection, envir = envir, controller = controller, @@ -70,6 +74,8 @@ crew_class <- R6::R6Class( public = list( controller = NULL, terminate_controller = NULL, + seconds_scale = NULL, + throttle = NULL, initialize = function( pipeline = NULL, meta = NULL, @@ -80,10 +86,12 @@ crew_class <- R6::R6Class( seconds_meta_append = NULL, seconds_meta_upload = NULL, seconds_reporter = NULL, + seconds_scale = NULL, garbage_collection = NULL, envir = NULL, controller = NULL, - terminate_controller = NULL + terminate_controller = NULL, + throttle = NULL ) { super$initialize( pipeline = pipeline, @@ -100,6 +108,7 @@ crew_class <- R6::R6Class( ) self$controller <- controller self$terminate_controller <- terminate_controller + self$seconds_scale <- seconds_scale }, produce_exports = function(envir, path_store, is_globalenv = NULL) { map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises @@ -201,14 +210,16 @@ crew_class <- R6::R6Class( iterate = function() { self$sync_meta_time() queue <- self$scheduler$queue + throttle <- self$throttle + interval <- throttle$seconds_interval if_any( queue$should_dequeue(), self$process_target(queue$dequeue()), self$controller$wait( mode = "one", - seconds_interval = 0.5, - seconds_timeout = 0.5, - scale = TRUE + seconds_interval = interval, + seconds_timeout = interval, + scale = throttle$poll() ) ) self$conclude_worker_task() @@ -272,6 +283,12 @@ crew_class <- R6::R6Class( self$iterate() } }, + start = function() { + super$start() + self$throttle <- crew::crew_throttle( + seconds_interval = self$seconds_scale + ) + }, run = function() { self$start() on.exit(self$end()) @@ -289,6 +306,17 @@ crew_class <- R6::R6Class( tar_assert_lgl(self$terminate_controller) tar_assert_scalar(self$terminate_controller) tar_assert_none_na(self$terminate_controller) + tar_assert_scalar(self$seconds_scale) + tar_assert_dbl(self$seconds_scale) + tar_assert_none_na(self$seconds_scale) + tar_assert_ge(self$seconds_scale, 0) + if_any( + is.null(self$throttle), + NULL, { + tar_assert_inherits(self$throttle, "crew_class_throttle") + self$throttle$validate() + } + ) } ) ) diff --git a/R/tar_config_get.R b/R/tar_config_get.R index 783280b9..1e5711b4 100644 --- a/R/tar_config_get.R +++ b/R/tar_config_get.R @@ -95,6 +95,7 @@ tar_config_get_project <- function(name, yaml) { seconds_meta_append = yaml$seconds_meta_append %|||% 0, seconds_meta_upload = yaml$seconds_meta_upload %|||% 15, seconds_reporter = yaml$seconds_reporter %|||% 0, + seconds_scale = yaml$seconds_scale %|||% 0.5, seconds_interval = yaml$seconds_interval, shortcut = yaml$shortcut %|||% FALSE, store = yaml$store %|||% path_store_default(), @@ -116,6 +117,7 @@ tar_config_get_convert <- function(name, value) { seconds_meta_append = as.numeric(value), seconds_meta_upload = as.numeric(value), seconds_reporter = as.numeric(value), + seconds_scale = as.numeric(value), seconds_interval = if_any(is.null(value), NULL, as.numeric(value)), shortcut = as.logical(value), store = as.character(value), diff --git a/R/tar_config_set.R b/R/tar_config_set.R index d5a5a7ac..e71e2c7c 100644 --- a/R/tar_config_set.R +++ b/R/tar_config_set.R @@ -93,6 +93,9 @@ #' and [tar_make_future()]. Positive numeric of length 1 with the minimum #' number of seconds between times when the reporter prints progress #' messages to the R console. +#' @param seconds_scale Argument of [tar_make()]. +#' Positive numeric of length 1, with the time interval (in seconds) +#' for auto-scaling `crew` workers if a `crew` controller is supplied. #' @param shortcut logical of length 1, default `shortcut` argument #' to [tar_make()] and related functions. #' If the argument `NULL`, the setting is not modified. @@ -170,6 +173,7 @@ tar_config_set <- function( seconds_meta_append = NULL, seconds_meta_upload = NULL, seconds_reporter = NULL, + seconds_scale = NULL, seconds_interval = NULL, store = NULL, shortcut = NULL, @@ -194,6 +198,7 @@ tar_config_set <- function( tar_config_assert_seconds_meta_append(seconds_meta_append) tar_config_assert_seconds_meta_upload(seconds_meta_upload) tar_config_assert_seconds_reporter(seconds_reporter) + tar_config_assert_seconds_scale(seconds_scale) tar_config_assert_seconds_interval(seconds_interval) tar_config_assert_shortcut(shortcut) tar_config_assert_store(store) @@ -217,6 +222,8 @@ tar_config_set <- function( yaml[[project]]$seconds_meta_upload yaml[[project]]$seconds_reporter <- seconds_reporter %|||% yaml[[project]]$seconds_reporter + yaml[[project]]$seconds_scale <- seconds_scale %|||% + yaml[[project]]$seconds_scale yaml[[project]]$seconds_interval <- seconds_interval %|||% yaml[[project]]$seconds_interval yaml[[project]]$shortcut <- shortcut %|||% yaml[[project]]$shortcut @@ -336,6 +343,16 @@ tar_config_assert_seconds_reporter <- function(seconds_reporter) { tar_assert_ge(seconds_reporter, 0) } +tar_config_assert_seconds_scale <- function(seconds_scale) { + if (is.null(seconds_scale)) { + return() + } + tar_assert_dbl(seconds_scale) + tar_assert_scalar(seconds_scale) + tar_assert_none_na(seconds_scale) + tar_assert_ge(seconds_scale, 0) +} + tar_config_assert_shortcut <- function(shortcut) { if (is.null(shortcut)) { return() diff --git a/R/tar_make.R b/R/tar_make.R index 2a4200bf..afd72f85 100644 --- a/R/tar_make.R +++ b/R/tar_make.R @@ -72,6 +72,9 @@ #' @param seconds_reporter Positive numeric of length 1 with the minimum #' number of seconds between times when the reporter prints progress #' messages to the R console. +#' @param seconds_scale Positive numeric of length 1, with the time interval +#' (in seconds) for auto-scaling `crew` workers if a `crew` controller +#' is supplied via `tar_option_set()` in `_targets.R`. #' @param garbage_collection Logical of length 1. For a `crew`-integrated #' pipeline, whether to run garbage collection on the main process #' before sending a target @@ -128,6 +131,7 @@ tar_make <- function( seconds_meta_append = targets::tar_config_get("seconds_meta_append"), seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"), seconds_reporter = targets::tar_config_get("seconds_reporter"), + seconds_scale = targets::tar_config_get("seconds_scale"), seconds_interval = targets::tar_config_get("seconds_interval"), callr_function = callr::r, callr_arguments = targets::tar_callr_args_default(callr_function, reporter), @@ -157,6 +161,9 @@ tar_make <- function( tar_assert_scalar(seconds_reporter) tar_assert_none_na(seconds_reporter) tar_assert_ge(seconds_reporter, 0) + tar_assert_scalar(seconds_scale) + tar_assert_none_na(seconds_scale) + tar_assert_ge(seconds_scale, 0) tar_deprecate_seconds_interval(seconds_interval) tar_assert_lgl(garbage_collection) tar_assert_scalar(garbage_collection) @@ -172,6 +179,7 @@ tar_make <- function( seconds_meta_append = seconds_meta_append, seconds_meta_upload = seconds_meta_upload, seconds_reporter = seconds_reporter, + seconds_scale = seconds_scale, garbage_collection = garbage_collection, use_crew = use_crew, terminate_controller = terminate_controller @@ -198,6 +206,7 @@ tar_make_inner <- function( seconds_meta_append, seconds_meta_upload, seconds_reporter, + seconds_scale, garbage_collection, use_crew, terminate_controller @@ -224,7 +233,7 @@ tar_make_inner <- function( envir = tar_option_get("envir") )$run() } else { - tar_assert_package("crew (>= 0.6.0)") + tar_assert_package("crew (>= 0.6.0.9004)") crew_init( pipeline = pipeline, meta = meta_init(path_store = path_store), @@ -235,6 +244,7 @@ tar_make_inner <- function( seconds_meta_append = seconds_meta_append, seconds_meta_upload = seconds_meta_upload, seconds_reporter = seconds_reporter, + seconds_scale = seconds_scale, garbage_collection = garbage_collection, envir = tar_option_get("envir"), controller = controller, diff --git a/man/tar_config_set.Rd b/man/tar_config_set.Rd index 4238758b..b4d58de8 100644 --- a/man/tar_config_set.Rd +++ b/man/tar_config_set.Rd @@ -15,6 +15,7 @@ tar_config_set( seconds_meta_append = NULL, seconds_meta_upload = NULL, seconds_reporter = NULL, + seconds_scale = NULL, seconds_interval = NULL, store = NULL, shortcut = NULL, @@ -94,6 +95,10 @@ and \code{\link[=tar_make_future]{tar_make_future()}}. Positive numeric of lengt number of seconds between times when the reporter prints progress messages to the R console.} +\item{seconds_scale}{Argument of \code{\link[=tar_make]{tar_make()}}. +Positive numeric of length 1, with the time interval (in seconds) +for auto-scaling \code{crew} workers if a \code{crew} controller is supplied.} + \item{seconds_interval}{Deprecated on 2023-08-24 (version 1.2.2.9001). Use \code{seconds_meta_append}, \code{seconds_meta_upload}, and \code{seconds_reporter} instead.} diff --git a/man/tar_make.Rd b/man/tar_make.Rd index bc6558cd..741a2073 100644 --- a/man/tar_make.Rd +++ b/man/tar_make.Rd @@ -11,6 +11,7 @@ tar_make( seconds_meta_append = targets::tar_config_get("seconds_meta_append"), seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"), seconds_reporter = targets::tar_config_get("seconds_reporter"), + seconds_scale = targets::tar_config_get("seconds_scale"), seconds_interval = targets::tar_config_get("seconds_interval"), callr_function = callr::r, callr_arguments = targets::tar_callr_args_default(callr_function, reporter), @@ -87,6 +88,10 @@ regardless of \code{seconds_meta_upload}.} number of seconds between times when the reporter prints progress messages to the R console.} +\item{seconds_scale}{Positive numeric of length 1, with the time interval +(in seconds) for auto-scaling \code{crew} workers if a \code{crew} controller +is supplied via \code{tar_option_set()} in \verb{_targets.R}.} + \item{seconds_interval}{Deprecated on 2023-08-24 (version 1.2.2.9001). Use \code{seconds_meta_append}, \code{seconds_meta_upload}, and \code{seconds_reporter} instead.} diff --git a/tests/testthat/test-class_crew.R b/tests/testthat/test-class_crew.R index 7365366a..2928a851 100644 --- a/tests/testthat/test-class_crew.R +++ b/tests/testthat/test-class_crew.R @@ -1,5 +1,5 @@ tar_test("crew$validate()", { - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") controller <- crew::crew_controller_local( host = "127.0.0.1", seconds_interval = 0.5 @@ -15,7 +15,7 @@ tar_test("crew database subkey", { tar_test("workerless deployment works", { skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -98,7 +98,7 @@ tar_test("semi-workerless deployment works", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") crew_test_sleep() tar_runtime$fun <- "tar_make" @@ -183,7 +183,7 @@ tar_test("some targets up to date, some not", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -238,7 +238,7 @@ tar_test("crew algo can skip targets", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -293,7 +293,7 @@ tar_test("nontrivial common data", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) diff --git a/tests/testthat/test-tar_config_set.R b/tests/testthat/test-tar_config_set.R index f7252fc0..33abb0e6 100644 --- a/tests/testthat/test-tar_config_set.R +++ b/tests/testthat/test-tar_config_set.R @@ -181,6 +181,21 @@ tar_test("tar_config_set() with seconds_reporter", { expect_equal(tar_config_get("seconds_reporter"), 0) }) +tar_test("tar_config_set() with seconds_scale", { + skip_cran() + expect_false(file.exists("_targets.yaml")) + expect_equal(tar_config_get("seconds_scale"), 0.5) + path <- tempfile() + tar_config_set(seconds_scale = 10) + expect_equal(tar_config_get("seconds_scale"), 10) + expect_true(file.exists("_targets.yaml")) + tar_config_set() + expect_equal(tar_config_get("seconds_scale"), 10) + expect_true(file.exists("_targets.yaml")) + unlink("_targets.yaml") + expect_equal(tar_config_get("seconds_scale"), 0.5) +}) + tar_test("tar_config_set() with seconds_interval", { skip_cran() expect_false(file.exists("_targets.yaml")) diff --git a/tests/testthat/test-tar_make.R b/tests/testthat/test-tar_make.R index f0ae40b5..63bd72e6 100644 --- a/tests/testthat/test-tar_make.R +++ b/tests/testthat/test-tar_make.R @@ -17,7 +17,7 @@ tar_test("tar_make() works", { tar_test("tar_make() works with crew", { skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.6.0") + skip_if_not_installed("crew", minimum_version = "0.6.0.9004") skip_if_not_installed("R.utils") should_skip <- identical(tolower(Sys.info()[["sysname"]]), "windows") && isTRUE(as.logical(Sys.getenv("CI")))