Skip to content

Commit

Permalink
Migrate to new clustermq 0.9.0 interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Sep 25, 2023
1 parent f5d01e6 commit 3b4f0a3
Showing 1 changed file with 22 additions and 49 deletions.
71 changes: 22 additions & 49 deletions R/class_clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -107,33 +107,19 @@ clustermq_class <- R6::R6Class(
# Cannot use multicore clustermq backend
# due to https://github.com/ropensci/targets/discussions/780
# nocov start
set_common_data = function() {
self$worker_list$set_common_data(
fun = identity,
const = list(),
export = self$produce_exports(
envir = self$envir,
path_store = self$meta$store
),
rettype = list(),
pkgs = "targets",
common_seed = 0L,
token = "set_common_data_token"
)
},
create_worker_list = function() {
worker_list <- clustermq::workers(
start_workers = function() {
self$worker_list <- clustermq::workers(
n_jobs = self$workers,
template = tar_option_get("resources")$clustermq$template %|||%
tar_option_get("resources") %|||%
list(),
log_worker = self$log_worker
)
self$worker_list <- worker_list
},
start_worker_list = function() {
self$create_worker_list()
self$set_common_data()
exports <- self$produce_exports(
envir = self$envir,
path_store = self$meta$store
)
do.call(what = self$worker_list$env, args = exports)
},
any_upcoming_jobs = function() {
need_workers <- fltr(
Expand All @@ -146,20 +132,17 @@ clustermq_class <- R6::R6Class(
if (self$garbage_collection) {
gc()
}
args <- list(
expr = quote(
targets::target_run_worker(
target = target,
envir = .tar_envir_5048826d,
path_store = .tar_path_store_5048826d,
fun = .tar_fun_5048826d,
options = .tar_options_5048826d,
envvars = .tar_envvars_5048826d
)
self$worker_list$send(
cmd = targets::target_run_worker(
target = target,
envir = .tar_envir_5048826d,
path_store = .tar_path_store_5048826d,
fun = .tar_fun_5048826d,
options = .tar_options_5048826d,
envvars = .tar_envvars_5048826d
),
env = list(target = target)
target = target
)
do.call(what = self$worker_list$send_call, args = args)
},
run_main = function(target) {
self$wait_or_shutdown()
Expand Down Expand Up @@ -199,7 +182,7 @@ clustermq_class <- R6::R6Class(
},
shut_down_worker = function() {
if (self$workers > 0L) {
self$worker_list$send_shutdown_worker()
self$worker_list$send_shutdown()
self$workers <- self$workers - 1L
self$scheduler$backoff$reset()
}
Expand Down Expand Up @@ -247,16 +230,9 @@ clustermq_class <- R6::R6Class(
},
iterate = function() {
self$sync_meta_time()
message <- if_any(
self$workers > 0L,
self$worker_list$receive_data(),
list()
)
self$conclude_worker_target(message$result)
token <- message$token
if (self$workers > 0L && !identical(token, "set_common_data_token")) {
self$worker_list$send_common_data()
} else if (self$scheduler$queue$is_nonempty()) {
target <- self$worker_list$recv()
self$conclude_worker_target(target)
if (self$scheduler$queue$is_nonempty()) {
self$next_target()
} else {
self$shut_down_worker()
Expand All @@ -278,14 +254,11 @@ clustermq_class <- R6::R6Class(
)
},
run_clustermq = function() {
on.exit(try(self$worker_list$finalize()))
self$start_worker_list()
on.exit(try(self$worker_list$cleanup()))
self$start_workers()
while (self$scheduler$progress$any_remaining()) {
self$iterate()
}
if (identical(try(self$worker_list$cleanup()), TRUE)) {
on.exit()
}
},
run = function() {
self$start()
Expand Down

0 comments on commit 3b4f0a3

Please sign in to comment.