diff --git a/R/class_clustermq.R b/R/class_clustermq.R index 72f5f5b0d..d1cef9396 100644 --- a/R/class_clustermq.R +++ b/R/class_clustermq.R @@ -107,33 +107,19 @@ clustermq_class <- R6::R6Class( # Cannot use multicore clustermq backend # due to https://github.com/ropensci/targets/discussions/780 # nocov start - set_common_data = function() { - self$worker_list$set_common_data( - fun = identity, - const = list(), - export = self$produce_exports( - envir = self$envir, - path_store = self$meta$store - ), - rettype = list(), - pkgs = "targets", - common_seed = 0L, - token = "set_common_data_token" - ) - }, - create_worker_list = function() { - worker_list <- clustermq::workers( + start_workers = function() { + self$worker_list <- clustermq::workers( n_jobs = self$workers, template = tar_option_get("resources")$clustermq$template %|||% tar_option_get("resources") %|||% list(), log_worker = self$log_worker ) - self$worker_list <- worker_list - }, - start_worker_list = function() { - self$create_worker_list() - self$set_common_data() + exports <- self$produce_exports( + envir = self$envir, + path_store = self$meta$store + ) + do.call(what = self$worker_list$env, args = exports) }, any_upcoming_jobs = function() { need_workers <- fltr( @@ -146,20 +132,17 @@ clustermq_class <- R6::R6Class( if (self$garbage_collection) { gc() } - args <- list( - expr = quote( - targets::target_run_worker( - target = target, - envir = .tar_envir_5048826d, - path_store = .tar_path_store_5048826d, - fun = .tar_fun_5048826d, - options = .tar_options_5048826d, - envvars = .tar_envvars_5048826d - ) + self$worker_list$send( + cmd = targets::target_run_worker( + target = target, + envir = .tar_envir_5048826d, + path_store = .tar_path_store_5048826d, + fun = .tar_fun_5048826d, + options = .tar_options_5048826d, + envvars = .tar_envvars_5048826d ), - env = list(target = target) + target = target ) - do.call(what = self$worker_list$send_call, args = args) }, run_main = function(target) { self$wait_or_shutdown() @@ -199,7 +182,7 @@ clustermq_class <- R6::R6Class( }, shut_down_worker = function() { if (self$workers > 0L) { - self$worker_list$send_shutdown_worker() + self$worker_list$send_shutdown() self$workers <- self$workers - 1L self$scheduler$backoff$reset() } @@ -247,16 +230,9 @@ clustermq_class <- R6::R6Class( }, iterate = function() { self$sync_meta_time() - message <- if_any( - self$workers > 0L, - self$worker_list$receive_data(), - list() - ) - self$conclude_worker_target(message$result) - token <- message$token - if (self$workers > 0L && !identical(token, "set_common_data_token")) { - self$worker_list$send_common_data() - } else if (self$scheduler$queue$is_nonempty()) { + target <- self$worker_list$recv() + self$conclude_worker_target(target) + if (self$scheduler$queue$is_nonempty()) { self$next_target() } else { self$shut_down_worker() @@ -278,14 +254,11 @@ clustermq_class <- R6::R6Class( ) }, run_clustermq = function() { - on.exit(try(self$worker_list$finalize())) - self$start_worker_list() + on.exit(try(self$worker_list$cleanup())) + self$start_workers() while (self$scheduler$progress$any_remaining()) { self$iterate() } - if (identical(try(self$worker_list$cleanup()), TRUE)) { - on.exit() - } }, run = function() { self$start()