Skip to content

Commit

Permalink
Use native throttling in crew
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Dec 6, 2023
1 parent 2d1a5e3 commit 9ca2db9
Show file tree
Hide file tree
Showing 8 changed files with 7 additions and 87 deletions.
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Because of the changes below, upgrading to this version of `targets` will unavoi
* Deprecate `tar_built()` in favor of `tar_completed()` (#1192).
* Console messages from reporters say "dispatched" and "completed" instead of "started" and "built" (#1192).
* The `crew` scheduling algorithm no longer waits on saturated controllers, and targets that are ready are greedily dispatched to `crew` even if all workers are busy (#1182, #1192). To appropriately set expectations for users, reporters print "dispatched (pending)" instead of "dispatched" if the task load is backlogged at the moment.
* In the `crew` scheduling algorithm, waiting for tasks is now a truly event-driven process and consumes 5-10x less CPU resources (#1183). Only the auto-scaling of workers uses polling (with an inexpensive default polling interval of 0.5 seconds, configurable with `seconds_scale`).
* In the `crew` scheduling algorithm, waiting for tasks is now a truly event-driven process and consumes 5-10x less CPU resources (#1183). Only the auto-scaling of workers uses polling (with an inexpensive default polling interval of 0.5 seconds, configurable through `seconds_interval` in the controller).

# targets 1.3.2

Expand Down
38 changes: 6 additions & 32 deletions R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ crew_init <- function(
seconds_meta_append = 0,
seconds_meta_upload = 15,
seconds_reporter = 0,
seconds_scale = 0.5,
garbage_collection = FALSE,
envir = tar_option_get("envir"),
controller = NULL,
Expand All @@ -24,7 +23,6 @@ crew_init <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = envir,
controller = controller,
Expand All @@ -42,7 +40,6 @@ crew_new <- function(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
garbage_collection = NULL,
envir = NULL,
controller = NULL,
Expand All @@ -58,7 +55,6 @@ crew_new <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = envir,
controller = controller,
Expand All @@ -74,8 +70,6 @@ crew_class <- R6::R6Class(
public = list(
controller = NULL,
terminate_controller = NULL,
seconds_scale = NULL,
throttle = NULL,
initialize = function(
pipeline = NULL,
meta = NULL,
Expand All @@ -86,12 +80,10 @@ crew_class <- R6::R6Class(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
garbage_collection = NULL,
envir = NULL,
controller = NULL,
terminate_controller = NULL,
throttle = NULL
terminate_controller = NULL
) {
super$initialize(
pipeline = pipeline,
Expand All @@ -108,7 +100,6 @@ crew_class <- R6::R6Class(
)
self$controller <- controller
self$terminate_controller <- terminate_controller
self$seconds_scale <- seconds_scale
},
produce_exports = function(envir, path_store, is_globalenv = NULL) {
map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises
Expand Down Expand Up @@ -170,6 +161,7 @@ crew_class <- R6::R6Class(
name = name,
controller = resources$controller,
scale = TRUE,
throttle = TRUE,
seconds_timeout = resources$seconds_timeout
)
},
Expand Down Expand Up @@ -210,22 +202,22 @@ crew_class <- R6::R6Class(
iterate = function() {
self$sync_meta_time()
queue <- self$scheduler$queue
throttle <- self$throttle
interval <- throttle$seconds_interval
interval <- self$controller$launcher$seconds_interval
if_any(
queue$should_dequeue(),
self$process_target(queue$dequeue()),
self$controller$wait(
mode = "one",
seconds_interval = interval,
seconds_timeout = interval,
scale = throttle$poll()
scale = TRUE,
throttle = TRUE
)
)
self$conclude_worker_task()
},
conclude_worker_task = function() {
result <- self$controller$pop(scale = TRUE)
result <- self$controller$pop(scale = TRUE, throttle = TRUE)
if (is.null(result)) {
return()
}
Expand Down Expand Up @@ -283,12 +275,6 @@ crew_class <- R6::R6Class(
self$iterate()
}
},
start = function() {
super$start()
self$throttle <- crew::crew_throttle(
seconds_interval = self$seconds_scale
)
},
run = function() {
self$start()
on.exit(self$end())
Expand All @@ -306,11 +292,6 @@ crew_class <- R6::R6Class(
tar_assert_lgl(self$terminate_controller)
tar_assert_scalar(self$terminate_controller)
tar_assert_none_na(self$terminate_controller)
tar_assert_scalar(self$seconds_scale)
tar_assert_dbl(self$seconds_scale)
tar_assert_none_na(self$seconds_scale)
tar_assert_ge(self$seconds_scale, 0)
validate_crew_throttle(self$throttle)
}
)
)
Expand Down Expand Up @@ -338,10 +319,3 @@ validate_crew_controller <- function(controller) {
tar_assert_function(controller$validate, msg = "invalid crew controller")
controller$validate()
}

validate_crew_throttle <- function(throttle) {
if (!is.null(throttle)) {
tar_assert_inherits(throttle, "crew_class_throttle")
throttle$validate()
}
}
2 changes: 0 additions & 2 deletions R/tar_config_get.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ tar_config_get_project <- function(name, yaml) {
seconds_meta_append = yaml$seconds_meta_append %|||% 0,
seconds_meta_upload = yaml$seconds_meta_upload %|||% 15,
seconds_reporter = yaml$seconds_reporter %|||% 0,
seconds_scale = yaml$seconds_scale %|||% 0.5,
seconds_interval = yaml$seconds_interval,
shortcut = yaml$shortcut %|||% FALSE,
store = yaml$store %|||% path_store_default(),
Expand All @@ -117,7 +116,6 @@ tar_config_get_convert <- function(name, value) {
seconds_meta_append = as.numeric(value),
seconds_meta_upload = as.numeric(value),
seconds_reporter = as.numeric(value),
seconds_scale = as.numeric(value),
seconds_interval = if_any(is.null(value), NULL, as.numeric(value)),
shortcut = as.logical(value),
store = as.character(value),
Expand Down
17 changes: 0 additions & 17 deletions R/tar_config_set.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@
#' and [tar_make_future()]. Positive numeric of length 1 with the minimum
#' number of seconds between times when the reporter prints progress
#' messages to the R console.
#' @param seconds_scale Argument of [tar_make()].
#' Positive numeric of length 1, with the time interval (in seconds)
#' for auto-scaling `crew` workers if a `crew` controller is supplied.
#' @param shortcut logical of length 1, default `shortcut` argument
#' to [tar_make()] and related functions.
#' If the argument `NULL`, the setting is not modified.
Expand Down Expand Up @@ -173,7 +170,6 @@ tar_config_set <- function(
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
seconds_reporter = NULL,
seconds_scale = NULL,
seconds_interval = NULL,
store = NULL,
shortcut = NULL,
Expand All @@ -198,7 +194,6 @@ tar_config_set <- function(
tar_config_assert_seconds_meta_append(seconds_meta_append)
tar_config_assert_seconds_meta_upload(seconds_meta_upload)
tar_config_assert_seconds_reporter(seconds_reporter)
tar_config_assert_seconds_scale(seconds_scale)
tar_config_assert_seconds_interval(seconds_interval)
tar_config_assert_shortcut(shortcut)
tar_config_assert_store(store)
Expand All @@ -222,8 +217,6 @@ tar_config_set <- function(
yaml[[project]]$seconds_meta_upload
yaml[[project]]$seconds_reporter <- seconds_reporter %|||%
yaml[[project]]$seconds_reporter
yaml[[project]]$seconds_scale <- seconds_scale %|||%
yaml[[project]]$seconds_scale
yaml[[project]]$seconds_interval <- seconds_interval %|||%
yaml[[project]]$seconds_interval
yaml[[project]]$shortcut <- shortcut %|||% yaml[[project]]$shortcut
Expand Down Expand Up @@ -343,16 +336,6 @@ tar_config_assert_seconds_reporter <- function(seconds_reporter) {
tar_assert_ge(seconds_reporter, 0)
}

tar_config_assert_seconds_scale <- function(seconds_scale) {
if (is.null(seconds_scale)) {
return()
}
tar_assert_dbl(seconds_scale)
tar_assert_scalar(seconds_scale)
tar_assert_none_na(seconds_scale)
tar_assert_ge(seconds_scale, 0)
}

tar_config_assert_shortcut <- function(shortcut) {
if (is.null(shortcut)) {
return()
Expand Down
10 changes: 0 additions & 10 deletions R/tar_make.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@
#' @param seconds_reporter Positive numeric of length 1 with the minimum
#' number of seconds between times when the reporter prints progress
#' messages to the R console.
#' @param seconds_scale Positive numeric of length 1, with the time interval
#' (in seconds) for auto-scaling `crew` workers if a `crew` controller
#' is supplied via `tar_option_set()` in `_targets.R`.
#' @param garbage_collection Logical of length 1. For a `crew`-integrated
#' pipeline, whether to run garbage collection on the main process
#' before sending a target
Expand Down Expand Up @@ -131,7 +128,6 @@ tar_make <- function(
seconds_meta_append = targets::tar_config_get("seconds_meta_append"),
seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"),
seconds_reporter = targets::tar_config_get("seconds_reporter"),
seconds_scale = targets::tar_config_get("seconds_scale"),
seconds_interval = targets::tar_config_get("seconds_interval"),
callr_function = callr::r,
callr_arguments = targets::tar_callr_args_default(callr_function, reporter),
Expand Down Expand Up @@ -161,9 +157,6 @@ tar_make <- function(
tar_assert_scalar(seconds_reporter)
tar_assert_none_na(seconds_reporter)
tar_assert_ge(seconds_reporter, 0)
tar_assert_scalar(seconds_scale)
tar_assert_none_na(seconds_scale)
tar_assert_ge(seconds_scale, 0)
tar_deprecate_seconds_interval(seconds_interval)
tar_assert_lgl(garbage_collection)
tar_assert_scalar(garbage_collection)
Expand All @@ -179,7 +172,6 @@ tar_make <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
use_crew = use_crew,
terminate_controller = terminate_controller
Expand All @@ -206,7 +198,6 @@ tar_make_inner <- function(
seconds_meta_append,
seconds_meta_upload,
seconds_reporter,
seconds_scale,
garbage_collection,
use_crew,
terminate_controller
Expand Down Expand Up @@ -244,7 +235,6 @@ tar_make_inner <- function(
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
seconds_reporter = seconds_reporter,
seconds_scale = seconds_scale,
garbage_collection = garbage_collection,
envir = tar_option_get("envir"),
controller = controller,
Expand Down
5 changes: 0 additions & 5 deletions man/tar_config_set.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions man/tar_make.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions tests/testthat/test-tar_config_set.R
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,6 @@ tar_test("tar_config_set() with seconds_reporter", {
expect_equal(tar_config_get("seconds_reporter"), 0)
})

tar_test("tar_config_set() with seconds_scale", {
skip_cran()
expect_false(file.exists("_targets.yaml"))
expect_equal(tar_config_get("seconds_scale"), 0.5)
path <- tempfile()
tar_config_set(seconds_scale = 10)
expect_equal(tar_config_get("seconds_scale"), 10)
expect_true(file.exists("_targets.yaml"))
tar_config_set()
expect_equal(tar_config_get("seconds_scale"), 10)
expect_true(file.exists("_targets.yaml"))
unlink("_targets.yaml")
expect_equal(tar_config_get("seconds_scale"), 0.5)
})

tar_test("tar_config_set() with seconds_interval", {
skip_cran()
expect_false(file.exists("_targets.yaml"))
Expand Down

0 comments on commit 9ca2db9

Please sign in to comment.