Skip to content

Commit

Permalink
Sketch #1220
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Jan 30, 2024
1 parent de16fb7 commit f738b79
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Suggests:
arrow (>= 3.0.0),
bs4Dash (>= 2.0.0),
clustermq (>= 0.9.2),
crew (>= 0.8.0),
crew (>= 0.8.0.9003),
curl (>= 4.3),
DT (>= 0.14),
dplyr (>= 1.0.0),
Expand Down
29 changes: 21 additions & 8 deletions R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ crew_class <- R6::R6Class(
list(common = common, globals = globals)
},
run_worker = function(target) {
name <- target_get_name(target)
resources <- target$settings$resources$crew
name_controller <- resources$controller
# Covered in tests/hpc/test-crew_local.R
# nocov start
if (self$controller$saturated(controller = name_controller)) {
self$controller$push_backlog(name = name, controller = name_controller)
return()
}
# nocov end
if (self$garbage_collection) {
gc()
}
Expand All @@ -143,14 +153,12 @@ crew_class <- R6::R6Class(
data <- self$exports$common
data$target <- target
globals <- self$exports$globals
resources <- target$settings$resources$crew
name <- target_get_name(target)
target_prepare(
target = target,
pipeline = self$pipeline,
scheduler = self$scheduler,
meta = self$meta,
pending = self$controller$saturated(controller = resources$controller)
pending = FALSE
)
self$sync_meta_time()
self$controller$push(
Expand All @@ -159,7 +167,7 @@ crew_class <- R6::R6Class(
globals = globals,
substitute = FALSE,
name = name,
controller = resources$controller,
controller = name_controller,
scale = TRUE,
throttle = TRUE,
seconds_timeout = resources$seconds_timeout
Expand Down Expand Up @@ -202,17 +210,22 @@ crew_class <- R6::R6Class(
iterate = function() {
self$sync_meta_time()
queue <- self$scheduler$queue
if_any(
queue$should_dequeue(),
self$process_target(queue$dequeue()),
# Covered in tests/hpc/test-crew_local.R
# nocov start
if (queue$should_dequeue()) {
self$process_target(queue$dequeue())
} else if (length(backlog <- self$controller$pop_backlog())) {
map(backlog, ~self$process_target(.x))
} else {
self$controller$wait(
mode = "one",
seconds_interval = 0.5,
seconds_timeout = 0.5,
scale = TRUE,
throttle = TRUE
)
)
}
# nocov end
self$conclude_worker_task()
},
conclude_worker_task = function() {
Expand Down
2 changes: 1 addition & 1 deletion R/tar_make.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ tar_make_inner <- function(
envir = tar_option_get("envir")
)$run()
} else {
tar_assert_package("crew (>= 0.8.0)")
tar_assert_package("crew (>= 0.8.0.9003)")
crew_init(
pipeline = pipeline,
meta = meta_init(path_store = path_store),
Expand Down
12 changes: 6 additions & 6 deletions tests/testthat/test-class_crew.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
tar_test("crew$validate()", {
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
controller <- crew::crew_controller_local(
host = "127.0.0.1",
seconds_interval = 0.5
Expand All @@ -15,7 +15,7 @@ tar_test("crew database subkey", {

tar_test("workerless deployment works", {
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -98,7 +98,7 @@ tar_test("semi-workerless deployment works", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
crew_test_sleep()
tar_runtime$fun <- "tar_make"
Expand Down Expand Up @@ -183,7 +183,7 @@ tar_test("some targets up to date, some not", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -238,7 +238,7 @@ tar_test("crew algo can skip targets", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down Expand Up @@ -293,7 +293,7 @@ tar_test("nontrivial common data", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
tar_runtime$fun <- "tar_make"
tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-tar_make.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tar_test("tar_make() works", {
tar_test("tar_make() works with crew", {
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.8.0")
skip_if_not_installed("crew", minimum_version = "0.8.0.9003")
skip_if_not_installed("R.utils")
should_skip <- identical(tolower(Sys.info()[["sysname"]]), "windows") &&
isTRUE(as.logical(Sys.getenv("CI")))
Expand Down

0 comments on commit f738b79

Please sign in to comment.