diff --git a/DESCRIPTION b/DESCRIPTION index 75eb19ad..ecd5f072 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally The methodology in this package borrows from GNU 'Make' (2015, ISBN:978-9881443519) and 'drake' (2018, ). -Version: 1.8.0.9009 +Version: 1.8.0.9010 License: MIT + file LICENSE URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets BugReports: https://github.com/ropensci/targets/issues diff --git a/NAMESPACE b/NAMESPACE index a10f4f71..c4d37f86 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -271,11 +271,16 @@ S3method(target_patternview_meta,default) S3method(target_patternview_meta,tar_branch) S3method(target_prepare,default) S3method(target_prepare,tar_builder) +S3method(target_produce_child,tar_pattern) +S3method(target_produce_child,tar_stem) S3method(target_produce_junction,tar_pattern) S3method(target_produce_junction,tar_stem) S3method(target_produce_record,tar_branch) S3method(target_produce_record,tar_pattern) S3method(target_produce_record,tar_stem) +S3method(target_produce_reference,default) +S3method(target_produce_reference,tar_branch) +S3method(target_produce_reference,tar_bud) S3method(target_read_value,tar_bud) S3method(target_read_value,tar_builder) S3method(target_read_value,tar_pattern) diff --git a/NEWS.md b/NEWS.md index 4c4ae7ed..f01d9288 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# targets 1.8.0.9009 (development) +# targets 1.8.0.9010 (development) * Un-break workflows that use `format = "file_fast"` (#1339, @koefoeden). * Fix deadlock in `error = "trim"` (#1340, @koefoeden). @@ -12,6 +12,7 @@ * Avoid `store_assert_format()` and `store_convert_object()` is `storage` is `"none"`. * Add a `list()` method to `tar_repository_cas()` to make it easier and more efficient to specify custom CAS repositories (#1366). * Improve speed and reduce memory consumption by avoiding deep copies of inner environments of target definition objects (#1368). +* Reduce memory consumption by storing buds and branches as lightweight references when `memory` is `"transient"` (#1364). # targets 1.8.0 diff --git a/R/class_branch.R b/R/class_branch.R index a857211b..c4efdae0 100644 --- a/R/class_branch.R +++ b/R/class_branch.R @@ -143,3 +143,14 @@ target_patternview_errored.tar_branch <- function( parent <- pipeline_get_target(pipeline, target_get_parent(target)) patternview_register_errored(parent$patternview, parent, scheduler) } + +#' @export +target_produce_reference.tar_branch <- function(target) { + file <- .subset2(target, "file") + reference_init( + parent = target_get_parent(target), + path = .subset2(file, "path"), + stage = .subset2(file, "stage"), + hash = .subset2(file, "hash") + ) +} diff --git a/R/class_bud.R b/R/class_bud.R index c90a3667..9cc79bdb 100644 --- a/R/class_bud.R +++ b/R/class_bud.R @@ -21,6 +21,11 @@ target_read_value.tar_bud <- function(target, pipeline) { value_init(object, parent$settings$iteration) } +#' @export +target_produce_reference.tar_bud <- function(target) { + reference_init(parent = target_get_parent(target)) +} + #' @export target_validate.tar_bud <- function(target) { tar_assert_correct_fields(target, bud_new, optional = "value") diff --git a/R/class_builder.R b/R/class_builder.R index 3b186373..ff47ea70 100644 --- a/R/class_builder.R +++ b/R/class_builder.R @@ -42,6 +42,7 @@ target_bootstrap.tar_builder <- function( record <- target_bootstrap_record(target, meta) target$store <- record_bootstrap_store(record) target$file <- record_bootstrap_file(record) + pipeline_set_target(pipeline, target) invisible() } @@ -191,6 +192,7 @@ target_skip.tar_builder <- function( ) { target_update_queue(target, scheduler) file_repopulate(target$file, meta$get_record(target_get_name(target))) + pipeline_set_target(pipeline, target) if (active) { builder_ensure_workspace( target = target, @@ -240,6 +242,7 @@ builder_completed <- function(target, pipeline, scheduler, meta) { target_ensure_buds(target, pipeline, scheduler) meta$insert_record(target_produce_record(target, pipeline, meta)) target_patternview_meta(target, pipeline, meta) + pipeline_set_target(pipeline, target) pipeline_register_loaded(pipeline, target_get_name(target)) scheduler$progress$register_completed(target) scheduler$reporter$report_completed(target, scheduler$progress) @@ -306,26 +309,13 @@ builder_ensure_deps <- function(target, pipeline, retrieval) { if (!identical(target$settings$retrieval, retrieval)) { return() } - tryCatch( - target_ensure_deps(target, pipeline), - error = function(error) { - message <- paste0( - "could not load dependencies of target ", - target_get_name(target), - ". ", - conditionMessage(error) - ) - expr <- as.expression(as.call(list(quote(stop), message))) - target$command$expr <- expr - target$settings$deployment <- "main" - } - ) + target_ensure_deps(target, pipeline) } builder_update_subpipeline <- function(target, pipeline) { target$subpipeline <- pipeline_produce_subpipeline( pipeline, - target_get_name(target) + target ) } @@ -344,7 +334,7 @@ builder_unmarshal_subpipeline <- function(target) { pipeline_unmarshal_values(target$subpipeline) } patterns <- fltr( - names(subpipeline$targets), + pipeline_get_names(subpipeline), ~inherits(pipeline_get_target(subpipeline, .x), "tar_pattern") ) map( @@ -390,6 +380,7 @@ builder_error_null <- function(target, pipeline, scheduler, meta) { record$data <- "error" meta$insert_record(record) target_patternview_meta(target, pipeline, meta) + pipeline_set_target(pipeline, target) pipeline_register_loaded(pipeline, target_get_name(target)) scheduler$progress$register_errored(target) } diff --git a/R/class_clustermq.R b/R/class_clustermq.R index 0386a403..ae173485 100644 --- a/R/class_clustermq.R +++ b/R/class_clustermq.R @@ -209,7 +209,6 @@ clustermq_class <- R6::R6Class( if (is.null(target)) { return() } - pipeline_set_target(self$pipeline, target) self$unmarshal_target(target) target_conclude( target, diff --git a/R/class_crew.R b/R/class_crew.R index 39589b9b..4c4db331 100644 --- a/R/class_crew.R +++ b/R/class_crew.R @@ -236,7 +236,6 @@ crew_class <- R6::R6Class( msg = paste("target", result$name, "error:", result$error) ) target <- result$result[[1]] - pipeline_set_target(self$pipeline, target) self$unmarshal_target(target) target_conclude( target, diff --git a/R/class_future.R b/R/class_future.R index 46a2a179..3f60401e 100644 --- a/R/class_future.R +++ b/R/class_future.R @@ -169,7 +169,6 @@ future_class <- R6::R6Class( }, conclude_worker_target = function(value, name) { target <- future_value_target(value, name, self$pipeline) - pipeline_set_target(self$pipeline, target) self$unmarshal_target(target) target_conclude( target, diff --git a/R/class_junction.R b/R/class_junction.R index 34867e68..f7fcffef 100644 --- a/R/class_junction.R +++ b/R/class_junction.R @@ -4,42 +4,55 @@ junction_init <- function( deps = list() ) { splits <- make.unique(splits, sep = "_") - names(deps) <- names(deps) %|||% seq_along(deps) + index <- seq_along(splits) + names(index) <- splits deps <- as_data_frame(deps) - junction_new(nexus, splits, deps) + has_deps <- nrow(deps) > 0L + junction_new(nexus, index, deps, has_deps) } -junction_new <- function(nexus = NULL, splits = NULL, deps = NULL) { +junction_new <- function( + nexus = NULL, + index = NULL, + deps = NULL, + has_deps = NULL +) { out <- new.env(parent = emptyenv(), hash = FALSE) out$nexus <- nexus - out$splits <- splits + out$index <- index out$deps <- deps + out$has_deps <- has_deps out } junction_upstream_edges <- function(junction) { - from <- utils::stack(junction$deps)$values - to <- rep(junction$splits, times = ncol(junction$deps)) + from <- unlist(junction$deps, use.names = FALSE) + to <- rep(junction_splits(junction), times = ncol(junction$deps)) data_frame(from = from, to = to) } -junction_get_splits <- function(junction) { - as.character(junction$splits) +junction_length <- function(junction) { + length(.subset2(junction, "index")) +} + +junction_splits <- function(junction) { + names(.subset2(junction, "index")) } -junction_transpose <- function(junction) { - splits <- junction$splits - deps <- junction$deps - out <- map_rows(deps, ~list(deps = unname(.x))) %||% - replicate(length(splits), list(deps = character(0)), simplify = FALSE) - for (index in seq_along(splits)) { - out[[index]]$split <- splits[index] +junction_extract_index <- function(junction, name) { + as.integer(.subset2(.subset2(junction, "index"), name)) +} + +junction_extract_deps <- function(junction, index) { + if (.subset2(junction, "has_deps")) { + as.character(vctrs::vec_slice(x = .subset2(junction, "deps"), i = index)) + } else { + character(0L) } - out } junction_invalidate <- function(junction) { - junction$splits <- rep(NA_character_, length(junction$splits)) + names(junction$index) <- rep(NA_character_, length(junction$index)) } junction_validate_deps <- function(deps) { @@ -52,6 +65,7 @@ junction_validate <- function(junction) { tar_assert_correct_fields(junction, junction_new) tar_assert_scalar(junction$nexus) tar_assert_chr(junction$nexus) - tar_assert_chr(junction$splits) + tar_assert_int(junction$index) + tar_assert_chr(junction_splits(junction)) junction_validate_deps(junction$deps) } diff --git a/R/class_pattern.R b/R/class_pattern.R index bd3e4ad8..8a0938c1 100644 --- a/R/class_pattern.R +++ b/R/class_pattern.R @@ -44,7 +44,7 @@ pattern_s3_class <- c("tar_pattern", "tar_target") #' @export target_get_children.tar_pattern <- function(target) { - junction_get_splits(target$junction) + junction_splits(target$junction) } #' @export @@ -96,13 +96,13 @@ target_conclude.tar_pattern <- function(target, pipeline, scheduler, meta) { #' @export target_read_value.tar_pattern <- function(target, pipeline) { branches <- target_get_children(target) - map( - branches, - ~target_ensure_value(pipeline_get_target(pipeline, .x), pipeline) - ) objects <- map( branches, - ~pipeline_get_target(pipeline, .x)$value$object + ~ { + target <- pipeline_get_target(pipeline, .x) + target_ensure_value(target, pipeline) + target$value$object + } ) names(objects) <- branches value <- value_init(iteration = target$settings$iteration) @@ -204,6 +204,11 @@ target_unmarshal_value.tar_pattern <- function(target) { target$value <- NULL } +#' @export +target_produce_child.tar_pattern <- function(target, name) { + pattern_produce_branch(target, name) +} + #' @export print.tar_pattern <- function(x, ...) { cat( @@ -247,27 +252,27 @@ pattern_prepend_branches <- function(target, scheduler) { scheduler$queue$prepend(children, ranks) } +pattern_produce_branch <- function(target, name) { + junction <- .subset2(target, "junction") + index <- junction_extract_index(junction, name) + branch_init( + name = name, + command = .subset2(target, "command"), + deps_parent = .subset2(target, "deps"), + deps_child = junction_extract_deps(junction, index), + settings = .subset2(target, "settings"), + cue = .subset2(target, "cue"), + store = .subset2(target, "store"), + index = index + ) +} + pattern_set_branches <- function(target, pipeline) { - command <- target$command - deps_parent <- target$deps - settings <- target$settings - cue <- target$cue - store <- target$store - specs <- junction_transpose(target$junction) - for (index in seq_along(specs)) { - spec <- .subset2(specs, index) - branch <- branch_init( - name = .subset2(spec, "split"), - command = command, - deps_parent = deps_parent, - deps_child = .subset2(spec, "deps"), - settings = settings, - cue = cue, - store = store, - index = index - ) - pipeline_set_target(pipeline, branch) - } + pipeline_initialize_references_children( + pipeline = pipeline, + name_parent = target_get_name(target), + names_children = junction_splits(target$junction) + ) } pattern_insert_branches <- function(target, pipeline, scheduler) { diff --git a/R/class_pipeline.R b/R/class_pipeline.R index ea680bbe..3abdba01 100644 --- a/R/class_pipeline.R +++ b/R/class_pipeline.R @@ -41,11 +41,41 @@ pipeline_targets_init <- function(targets, clone_targets) { } pipeline_get_target <- function(pipeline, name) { - .subset2(.subset2(pipeline, "targets"), name) + out <- .subset2(.subset2(pipeline, "targets"), name) + if (is_reference(out)) { + out <- reference_produce_target(out, pipeline, name) + } + out +} + +pipeline_set_target <- function(pipeline, target) { + envir <- .subset2(pipeline, "targets") + name <- target_get_name(target) + envir[[name]] <- target + NULL +} + +pipeline_set_reference <- function(pipeline, target) { + envir <- .subset2(pipeline, "targets") + name <- target_get_name(target) + envir[[name]] <- target_produce_reference(target) + NULL +} + +pipeline_initialize_references_children <- function( + pipeline, + name_parent, + names_children +) { + envir <- .subset2(pipeline, "targets") + for (name in names_children) { + envir[[name]] <- reference_init(parent = name_parent) + } + NULL } pipeline_get_names <- function(pipeline) { - names(pipeline$targets) + names(.subset2(pipeline, "targets")) } pipeline_get_priorities <- function(pipeline) { @@ -78,16 +108,12 @@ pipeline_reset_deployment <- function(pipeline, name) { target$settings$deployment <- "main" } -pipeline_set_target <- function(pipeline, target) { - envir <- .subset2(pipeline, "targets") - name <- target_get_name(target) - envir[[name]] <- target - NULL -} - pipeline_exists_target <- function(pipeline, name) { - envir <- pipeline$targets %|||% tar_empty_envir - exists(x = name, envir = envir, inherits = FALSE) + envir <- .subset2(pipeline, "targets") + if (is.null(envir)) { + envir <- tar_empty_envir + } + !is.null(.subset2(envir, name)) } pipeline_exists_import <- function(pipeline, name) { @@ -104,7 +130,10 @@ pipeline_targets_only_edges <- function(edges) { } pipeline_upstream_edges <- function(pipeline, targets_only = TRUE) { - edge_list <- map(pipeline$targets, ~target_upstream_edges(.x)) + edge_list <- map( + pipeline_get_names(pipeline), + ~target_upstream_edges(pipeline_get_target(pipeline, .x)) + ) from <- map(edge_list, ~.x$from) to <- map(edge_list, ~.x$to) from <- unlist(from, recursive = FALSE, use.names = FALSE) @@ -123,7 +152,7 @@ pipeline_produce_igraph <- function(pipeline, targets_only = TRUE) { igraph::simplify(igraph::graph_from_data_frame(edges)) } -pipeline_register_loaded_target <- function(pipeline, name) { # nolint +pipeline_register_loaded <- function(pipeline, name) { # nolint counter_set_name(pipeline$loaded, name) target <- pipeline_get_target(pipeline, name) if (identical(target$settings$memory, "transient")) { @@ -131,13 +160,12 @@ pipeline_register_loaded_target <- function(pipeline, name) { # nolint } } -pipeline_register_loaded <- function(pipeline, names) { - lapply(names, pipeline_register_loaded_target, pipeline = pipeline) -} - pipeline_unload_target <- function(pipeline, name) { - target <- pipeline_get_target(pipeline, name) - store_unload(target$store, target) + target <- .subset2(.subset2(pipeline, "targets"), name) + if (!is_reference(target)) { + store_unload(target$store, target) + pipeline_set_reference(pipeline, target) + } counter_del_name(pipeline$loaded, name) counter_del_name(pipeline$transient, name) } @@ -158,8 +186,11 @@ pipeline_unload_transient <- function(pipeline) { } } -pipeline_produce_subpipeline <- function(pipeline, name, keep_value = NULL) { - target <- pipeline_get_target(pipeline, name) +pipeline_produce_subpipeline <- function( + pipeline, + target, + keep_value = NULL +) { deps <- target_deps_deep(target, pipeline) targets <- new.env(parent = emptyenv()) keep_value <- keep_value %|||% identical(target$settings$retrieval, "main") @@ -180,7 +211,7 @@ pipeline_produce_subpipeline <- function(pipeline, name, keep_value = NULL) { pipeline_assign_target_copy <- function(pipeline, name, envir, keep_value) { target <- pipeline_get_target(pipeline, name) copy <- target_subpipeline_copy(target, keep_value) - assign(name, copy, envir = envir) + envir[[name]] <- copy } pipeline_marshal_values <- function(pipeline) { @@ -208,7 +239,11 @@ pipeline_prune_targets <- function(pipeline, names) { graph <- pipeline_produce_igraph(pipeline, targets_only = TRUE) keep <- upstream_vertices(graph = graph, from = names) discard <- setdiff(pipeline_get_names(pipeline), keep) - remove(list = discard, envir = pipeline$targets, inherits = FALSE) + remove( + list = discard, + envir = .subset2(pipeline, "targets"), + inherits = FALSE + ) } pipeline_prune_shortcut <- function(pipeline, names, shortcut) { @@ -273,7 +308,10 @@ pipeline_validate_dag <- function(igraph) { } pipeline_validate_conflicts <- function(pipeline) { - conflicts <- intersect(names(pipeline$imports), names(pipeline$targets)) + conflicts <- intersect( + names(.subset2(pipeline, "imports")), + pipeline_get_names(pipeline) + ) msg <- paste0( "Targets and globals must have unique names. ", "Ignoring global objects that conflict with target names: ", diff --git a/R/class_reference.R b/R/class_reference.R new file mode 100644 index 00000000..0e91de1c --- /dev/null +++ b/R/class_reference.R @@ -0,0 +1,47 @@ +reference_init <- function( + parent = NA_character_, + path = NA_character_, + stage = NA_character_, + hash = NA_character_ +) { + reference_new(parent = parent, path = path, stage = stage, hash = hash) +} + +reference_new <- function( + parent = NULL, + path = NULL, + stage = NULL, + hash = NULL +) { + c(parent = parent, path = path, stage = stage, hash = hash) +} + +reference_parent <- function(reference) { + as.character(.subset(reference, 1L)) +} + +reference_path <- function(reference) { + as.character(.subset(reference, 2L)) +} + +reference_stage <- function(reference) { + as.character(.subset(reference, 3L)) +} + +reference_hash <- function(reference) { + as.character(.subset(reference, 4L)) +} + +reference_produce_target <- function(reference, pipeline, name) { + parent <- pipeline_get_target(pipeline, reference_parent(reference)) + child <- target_produce_child(parent, name) + file <- .subset2(child, "file") + if (!is.null(file)) { + file$path <- reference_path(reference) + file$stage <- reference_stage(reference) + file$hash <- reference_hash(reference) + } + child +} + +is_reference <- is.character diff --git a/R/class_stem.R b/R/class_stem.R index f28023af..d9aa3623 100644 --- a/R/class_stem.R +++ b/R/class_stem.R @@ -47,7 +47,7 @@ target_get_children.tar_stem <- function(target) { if_any( is.null(target$junction), character(0), - junction_get_splits(target$junction) + junction_splits(target$junction) ) } @@ -166,14 +166,18 @@ stem_tar_assert_nonempty <- function(target) { } } -stem_produce_buds <- function(target) { - settings <- target$settings - names <- target_get_children(target) - map(seq_along(names), ~bud_new(names[.x], settings, .x)) +stem_produce_bud <- function(target, name) { + junction <- .subset2(target, "junction") + index <- junction_extract_index(junction, name) + bud_new(name = name, settings = .subset2(target, "settings"), index = index) } stem_insert_buds <- function(target, pipeline) { - map(stem_produce_buds(target), pipeline_set_target, pipeline = pipeline) + pipeline_initialize_references_children( + pipeline = pipeline, + name_parent = target_get_name(target), + names_children = junction_splits(target$junction) + ) } stem_ensure_buds <- function(target, pipeline, scheduler) { @@ -214,6 +218,11 @@ stem_restore_junction <- function(target, pipeline, meta) { target$junction <- junction } +#' @export +target_produce_child.tar_stem <- function(target, name) { + stem_produce_bud(target, name) +} + #' @export print.tar_stem <- function(x, ...) { cat( diff --git a/R/class_target.R b/R/class_target.R index 9b346a0a..114ce5b3 100644 --- a/R/class_target.R +++ b/R/class_target.R @@ -92,7 +92,22 @@ target_get_name <- function(target) { } target_ensure_dep <- function(target, dep, pipeline) { - target_ensure_value(dep, pipeline) + tryCatch( + target_ensure_value(dep, pipeline), + error = function(error) { + message <- paste0( + "could not load dependency ", + target_get_name(dep), + " of target ", + target_get_name(target), + ". ", + conditionMessage(error) + ) + expr <- as.expression(as.call(list(quote(stop), message))) + target$command$expr <- expr + target$settings$deployment <- "main" + } + ) } target_ensure_deps <- function(target, pipeline) { @@ -104,6 +119,7 @@ target_ensure_deps <- function(target, pipeline) { target_load_value <- function(target, pipeline) { target$value <- target_read_value(target, pipeline) + pipeline_set_target(pipeline, target) pipeline_register_loaded(pipeline, target_get_name(target)) } @@ -477,6 +493,19 @@ target_validate <- function(target) { UseMethod("target_validate") } +target_produce_child <- function(target, name) { + UseMethod("target_produce_child") +} + +target_produce_reference <- function(target) { + UseMethod("target_produce_reference") +} + +#' @export +target_produce_reference.default <- function(target) { + target +} + #' @export target_validate.tar_target <- function(target) { tar_assert_chr(target$name) diff --git a/R/class_workspace.R b/R/class_workspace.R index 537790ed..dca77326 100644 --- a/R/class_workspace.R +++ b/R/class_workspace.R @@ -2,7 +2,7 @@ workspace_init <- function(target, pipeline) { target <- target_workspace_copy(target) subpipeline <- pipeline_produce_subpipeline( pipeline, - target_get_name(target), + target, keep_value = FALSE ) workspace_new(target = target, subpipeline = subpipeline) diff --git a/R/tar_described_as.R b/R/tar_described_as.R index ef913913..c2b0b4c2 100644 --- a/R/tar_described_as.R +++ b/R/tar_described_as.R @@ -79,7 +79,13 @@ tar_described_as_inner <- function( described_as_quosure, tidyselect ) { - descriptions <- unlist(map(pipeline$targets, ~.x$settings$description)) + names <- pipeline_get_names(pipeline) + descriptions <- map( + names, + ~pipeline_get_target(pipeline, .x)$settings$description + ) + names(descriptions) <- names + descriptions <- unlist(descriptions, use.names = TRUE) chosen <- tar_tidyselect_eval(described_as_quosure, unique(descriptions)) sort(unique(names(descriptions[descriptions %in% chosen]))) } diff --git a/tests/testthat/test-class_branch.R b/tests/testthat/test-class_branch.R index 0d37f227..c9d816e7 100644 --- a/tests/testthat/test-class_branch.R +++ b/tests/testthat/test-class_branch.R @@ -140,6 +140,8 @@ tar_test("branch$produce_record() of a successful branch", { local$run() meta <- local$meta target <- pipeline_get_target(pipeline, target_get_children(map)[2L]) + target$file$hash <- hash_object(123L) + target$file$bytes <- 16 record <- target_produce_record(target, pipeline, meta) expect_silent(record_validate(record)) expect_true(grepl("^y_", record$name)) diff --git a/tests/testthat/test-class_junction.R b/tests/testthat/test-class_junction.R index 8f35e46b..f9c6ea66 100644 --- a/tests/testthat/test-class_junction.R +++ b/tests/testthat/test-class_junction.R @@ -1,19 +1,42 @@ -tar_test("junction deps", { +tar_test("junction with deps", { x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) out <- x$deps exp <- data_frame(a = LETTERS, b = rev(letters)) expect_equal(out, exp) + expect_true(x$has_deps) }) -tar_test("junction_get_splits()", { +tar_test("junction without deps", { + skip_cran() + x <- junction_init("x", letters, list()) + expect_equal(x$deps, data.frame()) + expect_false(x$has_deps) +}) + +tar_test("junction_length()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_length(x), length(letters)) +}) + +tar_test("junction_splits()", { x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) - expect_equal(junction_get_splits(x), letters) + expect_equal(junction_splits(x), letters) +}) + +tar_test("junction_extract_index()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_extract_index(x, "j"), 10L) +}) + +tar_test("junction_extract_deps()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_extract_deps(x, 10L), c("J", "q")) }) tar_test("junction_invalidate()", { x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) junction_invalidate(x) - expect_equal(junction_get_splits(x), rep(NA_character_, length(x$splits))) + expect_equal(junction_splits(x), rep(NA_character_, length(x$index))) }) tar_test("junction_upstream_edges()", { @@ -26,32 +49,6 @@ tar_test("junction_upstream_edges()", { expect_equal(out, exp) }) -tar_test("junction_transpose() without deps", { - names <- paste0("child_", seq_len(3)) - junction <- junction_init("parent", names) - out <- junction_transpose(junction) - exp <- list(deps = character(0), split = "child_1") - expect_equal(out[[1]], exp) - exp <- list(deps = character(0), split = "child_2") - expect_equal(out[[2]], exp) - exp <- list(deps = character(0), split = "child_3") - expect_equal(out[[3]], exp) -}) - -tar_test("junction transpose() with deps", { - names <- paste0("child_", seq_len(3)) - x <- paste0("x_", seq_len(3)) - y <- paste0("y_", seq_len(3)) - junction <- junction_init("parent", names, list(x, y)) - out <- junction_transpose(junction) - exp <- list(deps = sort(c("x_1", "y_1")), split = "child_1") - expect_equal(out[[1]], exp) - exp <- list(deps = sort(c("x_2", "y_2")), split = "child_2") - expect_equal(out[[2]], exp) - exp <- list(deps = sort(c("x_3", "y_3")), split = "child_3") - expect_equal(out[[3]], exp) -}) - tar_test("junction_validate()", { x <- junction_init("x", letters, list(LETTERS, rev = rev(letters))) expect_silent(junction_validate(x)) diff --git a/tests/testthat/test-class_pipeline.R b/tests/testthat/test-class_pipeline.R index fde8c0ed..4ed46c25 100644 --- a/tests/testthat/test-class_pipeline.R +++ b/tests/testthat/test-class_pipeline.R @@ -80,7 +80,7 @@ tar_test("pipeline_upstream_edges(targets_only = FALSE)", { expect_true(all(edges$to %in% names)) }) -tar_test("pipeline_register_loaded(pipeline, )", { +tar_test("pipeline_register_loaded()", { x <- target_init("x", quote(1), memory = "persistent") y <- target_init("y", quote(1), memory = "transient") pipeline <- pipeline_init(list(x, y)) @@ -154,7 +154,11 @@ tar_test("pipeline_produce_subpipeline()", { ) local <- local_init(pipeline) local$run() - subpipeline <- pipeline_produce_subpipeline(pipeline, "summary") + target <- target_init( + name = "summary", + expr = quote(c(map, data0)) + ) + subpipeline <- pipeline_produce_subpipeline(pipeline, target) out <- sort(pipeline_get_names(subpipeline)) branches <- target_get_children(pipeline_get_target(pipeline, "map")) exp <- sort(c("data0", "map", branches)) @@ -277,3 +281,59 @@ tar_test("automatically ignore non-target objects", { expect_equal(nrow(out), 1L) expect_equal(out$name, "x") }) + +tar_test("managing lightweight references to targets in pipelines", { + skip_cran() + pipeline <- pipeline_init( + list( + target_init( + name = "data", + expr = quote(seq_len(3L)) + ), + target_init( + name = "map", + expr = quote(data), + pattern = quote(map(data)) + ) + ) + ) + local <- local_init(pipeline) + local$run() + data <- pipeline_get_target(local$pipeline, "data") + map <- pipeline_get_target(local$pipeline, "map") + for (index in seq_len(2L)) { + bud_name <- junction_splits(data$junction)[index] + branch_name <- junction_splits(map$junction)[index] + bud <- pipeline_get_target(local$pipeline, bud_name) + branch <- pipeline_get_target(local$pipeline, branch_name) + reference <- pipeline$targets[[bud_name]] + expect_equal(reference_parent(reference), "data") + expect_equal(reference_path(reference), NA_character_) + expect_equal(reference_stage(reference), NA_character_) + expect_equal(reference_hash(reference), NA_character_) + reference <- pipeline$targets[[branch_name]] + expect_equal(reference_parent(reference), "map") + expect_equal(reference_path(reference), branch$file$path) + expect_equal(reference_stage(reference), branch$file$stage) + expect_equal(reference_hash(reference), branch$file$hash) + expect_s3_class(bud, "tar_bud") + expect_s3_class(branch, "tar_branch") + target_load_value(bud, local$pipeline) + expect_equal(bud$value$object, index) + target_load_value(branch, local$pipeline) + expect_equal(branch$value$object, index) + expect_s3_class(local$pipeline$targets[[bud_name]], "tar_bud") + expect_s3_class(local$pipeline$targets[[branch_name]], "tar_branch") + pipeline_unload_loaded(local$pipeline) + reference <- pipeline$targets[[bud_name]] + expect_equal(reference_parent(reference), "data") + expect_equal(reference_path(reference), NA_character_) + expect_equal(reference_stage(reference), NA_character_) + expect_equal(reference_hash(reference), NA_character_) + reference <- pipeline$targets[[branch_name]] + expect_equal(reference_parent(reference), "map") + expect_equal(reference_path(reference), branch$file$path) + expect_equal(reference_stage(reference), branch$file$stage) + expect_equal(reference_hash(reference), branch$file$hash) + } +}) diff --git a/tests/testthat/test-class_reference.R b/tests/testthat/test-class_reference.R new file mode 100644 index 00000000..82167052 --- /dev/null +++ b/tests/testthat/test-class_reference.R @@ -0,0 +1,97 @@ +tar_test("reference with only parent", { + out <- reference_init(parent = "my_parent") + expect_equal(reference_parent(out), "my_parent") + expect_equal(reference_path(out), NA_character_) + expect_equal(reference_stage(out), NA_character_) + expect_equal(reference_hash(out), NA_character_) +}) + +tar_test("reference with parent and path but no other fields", { + out <- reference_init(parent = "my_parent", path = "my_path") + expect_equal(reference_parent(out), "my_parent") + expect_equal(reference_path(out), "my_path") + expect_equal(reference_stage(out), NA_character_) + expect_equal(reference_hash(out), NA_character_) +}) + +tar_test("reference with parent and hash but no other fields", { + out <- reference_init(parent = "my_parent", hash = "my_hash") + expect_equal(reference_parent(out), "my_parent") + expect_equal(reference_path(out), NA_character_) + expect_equal(reference_stage(out), NA_character_) + expect_equal(reference_hash(out), "my_hash") +}) + +tar_test("reference with all fields", { + out <- reference_init( + parent = "my_parent", + path = "my_path", + stage = "my_stage", + hash = "my_hash" + ) + expect_equal(reference_parent(out), "my_parent") + expect_equal(reference_path(out), "my_path") + expect_equal(reference_stage(out), "my_stage") + expect_equal(reference_hash(out), "my_hash") +}) + +tar_test("reference_produce_target() and its inverse", { + skip_cran() + pipeline <- pipeline_init( + list( + target_init(name = "data", expr = quote(seq_len(3L)) + ), + target_init( + name = "map", + expr = quote(data), + pattern = quote(map(data)) + ) + ) + ) + local <- local_init(pipeline) + local$run() + data <- pipeline_get_target(local$pipeline, "data") + map <- pipeline_get_target(local$pipeline, "map") + for (index in seq_len(3L)) { + bud_name <- junction_splits(data$junction)[index] + branch_name <- junction_splits(map$junction)[index] + bud <- pipeline_get_target(local$pipeline, bud_name) + branch <- pipeline_get_target(local$pipeline, branch_name) + expect_equal(target_produce_reference(data), data) + expect_equal(target_produce_reference(map), map) + bud_reference <- target_produce_reference(bud) + branch_reference <- target_produce_reference(branch) + expect_equal(reference_parent(bud_reference), "data") + expect_equal(reference_path(bud_reference), NA_character_) + expect_equal(reference_stage(bud_reference), NA_character_) + expect_equal(reference_hash(bud_reference), NA_character_) + expect_equal(reference_parent(branch_reference), "map") + expect_equal(reference_path(branch_reference), branch$file$path) + expect_equal(reference_stage(branch_reference), branch$file$stage) + expect_equal(reference_hash(branch_reference), branch$file$hash) + expect_equal(basename(dirname(branch$file$path)), "objects") + expect_equal(basename(dirname(branch$file$stage)), "scratch") + expect_false(anyNA(branch$file$hash)) + expect_equal(nchar(branch$file$hash), 16L) + new_bud <- reference_produce_target(bud_reference, local$pipeline, bud_name) + suppressWarnings(rm(list = "value", envir = bud)) + expect_equal(new_bud, bud) + new_branch <- reference_produce_target( + branch_reference, + local$pipeline, + branch_name + ) + suppressWarnings( + rm(list = c("value", "metrics", "subpipeline"), envir = branch) + ) + branch$file$size <- NA_character_ + branch$file$time <- NA_character_ + branch$file$bytes <- 0 + expect_equal(new_branch, branch) + pipeline_unload_loaded(pipeline) + target_load_value(bud, local$pipeline) + expect_equal(bud$value$object, index) + target_load_value(branch, local$pipeline) + expect_equal(branch$value$object, index) + } +}) diff --git a/tests/testthat/test-class_stem.R b/tests/testthat/test-class_stem.R index 5d2afbe8..cdce95d1 100644 --- a/tests/testthat/test-class_stem.R +++ b/tests/testthat/test-class_stem.R @@ -24,18 +24,18 @@ tar_test("stem$update_junction() on a good stem", { pipeline <- pipeline_init(list(x)) stem_update_junction(x, pipeline) expect_silent(junction_validate(x$junction)) - out <- x$junction$splits + out <- junction_splits(x$junction) expect_length(out, 10L) expect_true(all(grepl("abc_", out))) }) -tar_test("stem_produce_buds()", { +tar_test("stem produce buds", { x <- target_init(name = "abc", expr = quote(letters)) tar_option_set(envir = baseenv()) target_run(x, tar_option_get("envir"), path_store_default()) pipeline <- pipeline_init(list(x)) stem_update_junction(x, pipeline) - children <- stem_produce_buds(x) + children <- map(target_get_children(x), ~stem_produce_bud(x, .x)) expect_true(is.list(children)) expect_length(children, length(letters)) for (index in seq_along(letters)) { diff --git a/tests/testthat/test-tar_repository_cas_local.R b/tests/testthat/test-tar_repository_cas_local.R index d94d63b9..fb922e2a 100644 --- a/tests/testthat/test-tar_repository_cas_local.R +++ b/tests/testthat/test-tar_repository_cas_local.R @@ -98,3 +98,55 @@ tar_test("local CAS repository works on custom directory", { expect_equal(tar_outdated(), "z") tar_destroy() }) + +tar_test("local CAS repository with some invalidated branches", { + skip_cran() + tar_script({ + tar_option_set(repository = tar_repository_cas_local(path = "cas")) + tar_option_set(memory = "transient") + list( + tar_target(x, seq_len(3)), + tar_target(y, x, pattern = map(x)), + tar_target(z, y, pattern = map(y)), + tar_target(w, sum(y)) + ) + }) + tar_make(callr_function = NULL) + tar_script({ + tar_option_set(repository = tar_repository_cas_local(path = "cas")) + tar_option_set(memory = "transient") + list( + tar_target(x, c(1L, 5L, 3L)), + tar_target(y, x, pattern = map(x)), + tar_target(z, y, pattern = map(y)), + tar_target(w, sum(y)) + ) + }) + tar_make(callr_function = NULL) + expect_equal(tar_read(w), 9L) +}) + +tar_test("local CAS repository while depending on all branches", { + skip_cran() + tar_script({ + tar_option_set(repository = tar_repository_cas_local(path = "cas")) + tar_option_set(memory = "transient") + list( + tar_target(x, seq_len(3)), + tar_target(y, x, pattern = map(x)), + tar_target(z, y) + ) + }) + tar_make(callr_function = NULL) + tar_script({ + tar_option_set(repository = tar_repository_cas_local(path = "cas")) + tar_option_set(memory = "transient") + list( + tar_target(x, c(1L, 5L, 3L)), + tar_target(y, x, pattern = map(x)), + tar_target(z, y) + ) + }) + tar_make(callr_function = NULL) + expect_equal(as.integer(tar_read(z)), c(1L, 5L, 3L)) +})