Skip to content

Commit

Permalink
replace memory with lookup in future
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 10, 2024
1 parent d572a98 commit a968f8b
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 23 deletions.
2 changes: 1 addition & 1 deletion R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ target_update_depend.tar_builder <- function(target, pipeline, meta) {
lookup_set(
lookup = .subset2(meta, "depends"),
names = target_get_name(target),
value = .subset2(meta, "produce_depend")(target, pipeline)
object = .subset2(meta, "produce_depend")(target, pipeline)
)
}

Expand Down
2 changes: 1 addition & 1 deletion R/class_database.R
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ database_class <- R6::R6Class(
lookup_set(
.subset2(self, "lookup"),
name = as.character(.subset2(row, "name")),
value = as.list(row)
object = as.list(row)
)
},
del_rows = function(names) {
Expand Down
14 changes: 7 additions & 7 deletions R/class_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ future_class <- R6::R6Class(
envir = envir
)
self$workers <- workers
self$worker_list <- memory_init()
self$worker_list <- lookup_new()
},
run_worker = function(target) {
builder_marshal_subpipeline(target)
Expand Down Expand Up @@ -132,8 +132,8 @@ future_class <- R6::R6Class(
seed = FALSE
)
future <- do.call(what = future::future, args = args)
memory_set_object(
self$worker_list,
lookup_set(
lookup = self$worker_list,
name = target_get_name(target),
object = future
)
Expand Down Expand Up @@ -179,7 +179,7 @@ future_class <- R6::R6Class(
self$scheduler$backoff$reset()
},
can_submit = function() {
self$worker_list$count < self$workers &&
lookup_count(self$worker_list) < self$workers &&
self$scheduler$queue$is_nonempty()
},
try_submit = function(wait) {
Expand All @@ -193,16 +193,16 @@ future_class <- R6::R6Class(
tryCatch(future::value(worker, signal = FALSE), error = identity)
},
process_worker = function(name) {
worker <- memory_get_object(self$worker_list, name)
worker <- lookup_get(self$worker_list, name)
if (future::resolved(worker)) {
value <- self$future_value(worker)
self$conclude_worker_target(value, name)
memory_del_objects(self$worker_list, name)
lookup_remove(self$worker_list, name)
}
self$try_submit(wait = FALSE)
},
process_workers = function() {
names <- self$worker_list$names
names <- lookup_list(self$worker_list)
if (!length(names)) {
return()
}
Expand Down
8 changes: 6 additions & 2 deletions R/class_lookup.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ lookup_list <- function(lookup) {
as.character(names(lookup))
}

lookup_set <- function(lookup, names, value) {
lookup_count <- function(lookup) {
length(lookup_list(lookup))
}

lookup_set <- function(lookup, names, object) {
for (name in names) {
lookup[[name]] <- value
lookup[[name]] <- object
}
}

Expand Down
8 changes: 6 additions & 2 deletions R/class_meta.R
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ meta_class <- R6::R6Class(
tar_runtime$meta <- self
},
ensure_preprocessed = function(write = FALSE) {
if (length(lookup_list(self$database$lookup)) == 0L) {
if (lookup_count(self$database$lookup) < 1L) {
self$preprocess(write = write)
}
},
Expand All @@ -134,7 +134,11 @@ meta_class <- R6::R6Class(
hashes <- split(x = data$data, f = data$repository)
lookup_table <- lookup_new()
for (name in names(hashes)) {
lookup_set(lookup_table, names = name, value = .subset2(hashes, name))
lookup_set(
lookup = lookup_table,
names = name,
object = .subset2(hashes, name)
)
}
self$repository_cas_lookup_table <- lookup_table
},
Expand Down
2 changes: 1 addition & 1 deletion R/class_pattern.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ target_update_depend.tar_pattern <- function(target, pipeline, meta) {
lookup_set(
lookup = .subset2(meta, "depends"),
names = target_get_name(target),
value = hash_null
object = hash_null
)
}

Expand Down
10 changes: 5 additions & 5 deletions R/class_store_repository_cas.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ store_has_correct_hash.tar_repository_cas <- function(store, file) {
lookup <- tar_repository_cas_lookup(store)
key <- .subset2(file, "hash")
if (lookup_missing(lookup = lookup, name = key)) {
value <- store_repository_cas_call_method(
object <- store_repository_cas_call_method(
store = store,
text = store$methods_repository$exists,
args = list(key = key)
)
lookup_set(lookup = lookup, names = key, value = value)
lookup_set(lookup = lookup, names = key, object = object)
}
lookup_get(lookup = lookup, name = key)
}
Expand Down Expand Up @@ -122,9 +122,9 @@ tar_repository_cas_lookup <- function(store) {
args = list(keys = keys_meta)
)
lookup <- lookup_new()
lookup_set(lookup, names = as.character(keys_cas), value = TRUE)
lookup_set(lookup, names = setdiff(keys_meta, keys_cas), value = FALSE)
lookup_set(lookup_table, names = repository, value = lookup)
lookup_set(lookup, names = as.character(keys_cas), object = TRUE)
lookup_set(lookup, names = setdiff(keys_meta, keys_cas), object = FALSE)
lookup_set(lookup_table, names = repository, object = lookup)
lookup
}

Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-class_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ tar_test("branching plan", {
pipeline <- pipeline_map()
out <- future_init(pipeline, workers = 2L)
out$run()
expect_equal(out$worker_list$count, 0L)
expect_equal(lookup_count(out$worker_list), 0L)
skipped <- names(out$scheduler$progress$skipped$envir)
expect_equal(skipped, character(0))
out2 <- future_init(pipeline_map(), workers = 2L)
Expand Down
8 changes: 5 additions & 3 deletions tests/testthat/test-class_lookup.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
tar_test("class lookup", {
lookup <- lookup_new()
lookup_set(lookup, names = letters, value = TRUE)
lookup_set(lookup, names = LETTERS, value = FALSE)
expect_equal(lookup_count(lookup), 0L)
lookup_set(lookup, names = letters, object = TRUE)
lookup_set(lookup, names = LETTERS, object = FALSE)
expect_equal(lookup_count(lookup), length(letters) + length(LETTERS))
expect_equal(sort(lookup_list(lookup)), sort(c(letters, LETTERS)))
for (x in letters) {
expect_true(lookup_exists(lookup, x))
Expand All @@ -26,7 +28,7 @@ tar_test("class lookup", {

tar_test("class lookup remove method", {
lookup <- lookup_new()
lookup_set(lookup, names = letters, value = TRUE)
lookup_set(lookup, names = letters, object = TRUE)
expect_equal(sort(lookup_list(lookup)), sort(letters))
lookup_remove(lookup, letters)
expect_equal(lookup_list(lookup), character(0L))
Expand Down

0 comments on commit a968f8b

Please sign in to comment.