Skip to content

Commit

Permalink
Throttled scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Dec 5, 2023
1 parent a12a718 commit c7fa173
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 13 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Suggests:
arrow (>= 3.0.0),
bs4Dash (>= 2.0.0),
clustermq (>= 0.9.1),
crew (>= 0.6.0),
crew (>= 0.6.0.9004),
curl (>= 4.3),
DT (>= 0.14),
dplyr (>= 1.0.0),
Expand Down
36 changes: 32 additions & 4 deletions R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ crew_init <- function(
seconds_meta_append = 0,
seconds_meta_upload = 15,
seconds_reporter = 0,
seconds_scale = 0.5,
garbage_collection = FALSE,
envir = tar_option_get("envir"),
controller = NULL,
Expand All @@ -23,6 +24,7 @@ crew_init <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = envir,
controller = controller,
Expand All @@ -40,6 +42,7 @@ crew_new <- function(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
garbage_collection = NULL,
envir = NULL,
controller = NULL,
Expand All @@ -55,6 +58,7 @@ crew_new <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = envir,
controller = controller,
Expand All @@ -70,6 +74,8 @@ crew_class <- R6::R6Class(
public = list(
controller = NULL,
terminate_controller = NULL,
seconds_scale = NULL,
throttle = NULL,
initialize = function(
pipeline = NULL,
meta = NULL,
Expand All @@ -80,10 +86,12 @@ crew_class <- R6::R6Class(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
garbage_collection = NULL,
envir = NULL,
controller = NULL,
terminate_controller = NULL
terminate_controller = NULL,
throttle = NULL
) {
super$initialize(
pipeline = pipeline,
Expand All @@ -100,6 +108,7 @@ crew_class <- R6::R6Class(
)
self$controller <- controller
self$terminate_controller <- terminate_controller
self$seconds_scale <- seconds_scale
},
produce_exports = function(envir, path_store, is_globalenv = NULL) {
map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises
Expand Down Expand Up @@ -201,14 +210,16 @@ crew_class <- R6::R6Class(
iterate = function() {
self$sync_meta_time()
queue <- self$scheduler$queue
throttle <- self$throttle
interval <- throttle$seconds_interval
if_any(
queue$should_dequeue(),
self$process_target(queue$dequeue()),
self$controller$wait(
mode = "one",
seconds_interval = 0.5,
seconds_timeout = 0.5,
scale = TRUE
seconds_interval = interval,
seconds_timeout = interval,
scale = throttle$poll()
)
)
self$conclude_worker_task()
Expand Down Expand Up @@ -272,6 +283,12 @@ crew_class <- R6::R6Class(
self$iterate()
}
},
start = function() {
super$start()
self$throttle <- crew::crew_throttle(
seconds_interval = self$seconds_scale
)
},
run = function() {
self$start()
on.exit(self$end())
Expand All @@ -289,6 +306,17 @@ crew_class <- R6::R6Class(
tar_assert_lgl(self$terminate_controller)
tar_assert_scalar(self$terminate_controller)
tar_assert_none_na(self$terminate_controller)
tar_assert_scalar(self$seconds_scale)
tar_assert_dbl(self$seconds_scale)
tar_assert_none_na(self$seconds_scale)
tar_assert_ge(self$seconds_scale, 0)
if_any(
is.null(self$throttle),
NULL, {
tar_assert_inherits(self$throttle, "crew_class_throttle")
self$throttle$validate()
}
)
}
)
)
Expand Down
2 changes: 2 additions & 0 deletions R/tar_config_get.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ tar_config_get_project <- function(name, yaml) {
seconds_meta_append = yaml$seconds_meta_append %|||% 0,
seconds_meta_upload = yaml$seconds_meta_upload %|||% 15,
seconds_reporter = yaml$seconds_reporter %|||% 0,
seconds_scale = yaml$seconds_scale %|||% 0.5,
seconds_interval = yaml$seconds_interval,
shortcut = yaml$shortcut %|||% FALSE,
store = yaml$store %|||% path_store_default(),
Expand All @@ -116,6 +117,7 @@ tar_config_get_convert <- function(name, value) {
seconds_meta_append = as.numeric(value),
seconds_meta_upload = as.numeric(value),
seconds_reporter = as.numeric(value),
seconds_scale = as.numeric(value),
seconds_interval = if_any(is.null(value), NULL, as.numeric(value)),
shortcut = as.logical(value),
store = as.character(value),
Expand Down
17 changes: 17 additions & 0 deletions R/tar_config_set.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
#' and [tar_make_future()]. Positive numeric of length 1 with the minimum
#' number of seconds between times when the reporter prints progress
#' messages to the R console.
#' @param seconds_scale Argument of [tar_make()].
#' Positive numeric of length 1, with the time interval (in seconds)
#' for auto-scaling `crew` workers if a `crew` controller is supplied.
#' @param shortcut logical of length 1, default `shortcut` argument
#' to [tar_make()] and related functions.
#' If the argument `NULL`, the setting is not modified.
Expand Down Expand Up @@ -170,6 +173,7 @@ tar_config_set <- function(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
seconds_interval = NULL,
store = NULL,
shortcut = NULL,
Expand All @@ -194,6 +198,7 @@ tar_config_set <- function(
tar_config_assert_seconds_meta_append(seconds_meta_append)
tar_config_assert_seconds_meta_upload(seconds_meta_upload)
tar_config_assert_seconds_reporter(seconds_reporter)
tar_config_assert_seconds_scale(seconds_scale)
tar_config_assert_seconds_interval(seconds_interval)
tar_config_assert_shortcut(shortcut)
tar_config_assert_store(store)
Expand All @@ -217,6 +222,8 @@ tar_config_set <- function(
yaml[[project]]$seconds_meta_upload
yaml[[project]]$seconds_reporter <- seconds_reporter %|||%
yaml[[project]]$seconds_reporter
yaml[[project]]$seconds_scale <- seconds_scale %|||%
yaml[[project]]$seconds_scale
yaml[[project]]$seconds_interval <- seconds_interval %|||%
yaml[[project]]$seconds_interval
yaml[[project]]$shortcut <- shortcut %|||% yaml[[project]]$shortcut
Expand Down Expand Up @@ -336,6 +343,16 @@ tar_config_assert_seconds_reporter <- function(seconds_reporter) {
tar_assert_ge(seconds_reporter, 0)
}

tar_config_assert_seconds_scale <- function(seconds_scale) {
if (is.null(seconds_scale)) {
return()
}
tar_assert_dbl(seconds_scale)
tar_assert_scalar(seconds_scale)
tar_assert_none_na(seconds_scale)
tar_assert_ge(seconds_scale, 0)
}

tar_config_assert_shortcut <- function(shortcut) {
if (is.null(shortcut)) {
return()
Expand Down
12 changes: 11 additions & 1 deletion R/tar_make.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
#' @param seconds_reporter Positive numeric of length 1 with the minimum
#' number of seconds between times when the reporter prints progress
#' messages to the R console.
#' @param seconds_scale Positive numeric of length 1, with the time interval
#' (in seconds) for auto-scaling `crew` workers if a `crew` controller
#' is supplied via `tar_option_set()` in `_targets.R`.
#' @param garbage_collection Logical of length 1. For a `crew`-integrated
#' pipeline, whether to run garbage collection on the main process
#' before sending a target
Expand Down Expand Up @@ -128,6 +131,7 @@ tar_make <- function(
seconds_meta_append = targets::tar_config_get("seconds_meta_append"),
seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"),
seconds_reporter = targets::tar_config_get("seconds_reporter"),
seconds_scale = targets::tar_config_get("seconds_scale"),
seconds_interval = targets::tar_config_get("seconds_interval"),
callr_function = callr::r,
callr_arguments = targets::tar_callr_args_default(callr_function, reporter),
Expand Down Expand Up @@ -157,6 +161,9 @@ tar_make <- function(
tar_assert_scalar(seconds_reporter)
tar_assert_none_na(seconds_reporter)
tar_assert_ge(seconds_reporter, 0)
tar_assert_scalar(seconds_scale)
tar_assert_none_na(seconds_scale)
tar_assert_ge(seconds_scale, 0)
tar_deprecate_seconds_interval(seconds_interval)
tar_assert_lgl(garbage_collection)
tar_assert_scalar(garbage_collection)
Expand All @@ -172,6 +179,7 @@ tar_make <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
use_crew = use_crew,
terminate_controller = terminate_controller
Expand All @@ -198,6 +206,7 @@ tar_make_inner <- function(
seconds_meta_append,
seconds_meta_upload,
seconds_reporter,
seconds_scale,
garbage_collection,
use_crew,
terminate_controller
Expand All @@ -224,7 +233,7 @@ tar_make_inner <- function(
envir = tar_option_get("envir")
)$run()
} else {
tar_assert_package("crew (>= 0.6.0)")
tar_assert_package("crew (>= 0.6.0.9004)")
crew_init(
pipeline = pipeline,
meta = meta_init(path_store = path_store),
Expand All @@ -235,6 +244,7 @@ tar_make_inner <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = tar_option_get("envir"),
controller = controller,
Expand Down
5 changes: 5 additions & 0 deletions man/tar_config_set.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions man/tar_make.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions tests/testthat/test-class_crew.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
tar_test("crew$validate()", {
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
controller <- crew::crew_controller_local(
host = "127.0.0.1",
seconds_interval = 0.5
Expand All @@ -15,7 +15,7 @@ tar_test("crew database subkey", {

tar_test("workerless deployment works", {
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -98,7 +98,7 @@ tar_test("semi-workerless deployment works", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
crew_test_sleep()
tar_runtime$fun <- "tar_make"
Expand Down Expand Up @@ -183,7 +183,7 @@ tar_test("some targets up to date, some not", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -238,7 +238,7 @@ tar_test("crew algo can skip targets", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -293,7 +293,7 @@ tar_test("nontrivial common data", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down
15 changes: 15 additions & 0 deletions tests/testthat/test-tar_config_set.R
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ tar_test("tar_config_set() with seconds_reporter", {
expect_equal(tar_config_get("seconds_reporter"), 0)
})

tar_test("tar_config_set() with seconds_scale", {
skip_cran()
expect_false(file.exists("_targets.yaml"))
expect_equal(tar_config_get("seconds_scale"), 0.5)
path <- tempfile()
tar_config_set(seconds_scale = 10)
expect_equal(tar_config_get("seconds_scale"), 10)
expect_true(file.exists("_targets.yaml"))
tar_config_set()
expect_equal(tar_config_get("seconds_scale"), 10)
expect_true(file.exists("_targets.yaml"))
unlink("_targets.yaml")
expect_equal(tar_config_get("seconds_scale"), 0.5)
})

tar_test("tar_config_set() with seconds_interval", {
skip_cran()
expect_false(file.exists("_targets.yaml"))
Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-tar_make.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tar_test("tar_make() works", {
tar_test("tar_make() works with crew", {
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.6.0")
skip_if_not_installed("crew", minimum_version = "0.6.0.9004")
skip_if_not_installed("R.utils")
should_skip <- identical(tolower(Sys.info()[["sysname"]]), "windows") &&
isTRUE(as.logical(Sys.getenv("CI")))
Expand Down

0 comments on commit c7fa173

Please sign in to comment.