diff --git a/R/class_junction.R b/R/class_junction.R index 1d8f5f4f..f7fcffef 100644 --- a/R/class_junction.R +++ b/R/class_junction.R @@ -3,40 +3,52 @@ junction_init <- function( splits = character(0), deps = list() ) { + splits <- make.unique(splits, sep = "_") index <- seq_along(splits) - names(index) <- make.unique(splits, sep = "_") - names(deps) <- names(deps) %|||% seq_along(deps) + names(index) <- splits deps <- as_data_frame(deps) - junction_new(nexus, index, deps) + has_deps <- nrow(deps) > 0L + junction_new(nexus, index, deps, has_deps) } -junction_new <- function(nexus = NULL, index = NULL, deps = NULL) { +junction_new <- function( + nexus = NULL, + index = NULL, + deps = NULL, + has_deps = NULL +) { out <- new.env(parent = emptyenv(), hash = FALSE) out$nexus <- nexus out$index <- index out$deps <- deps + out$has_deps <- has_deps out } junction_upstream_edges <- function(junction) { - from <- utils::stack(junction$deps)$values + from <- unlist(junction$deps, use.names = FALSE) to <- rep(junction_splits(junction), times = ncol(junction$deps)) data_frame(from = from, to = to) } +junction_length <- function(junction) { + length(.subset2(junction, "index")) +} + junction_splits <- function(junction) { - names(junction$index) + names(.subset2(junction, "index")) } -junction_transpose <- function(junction) { - splits <- junction_splits(junction) - deps <- junction$deps - out <- map_rows(deps, ~list(deps = unname(.x))) %||% - replicate(length(splits), list(deps = character(0)), simplify = FALSE) - for (index in seq_along(splits)) { - out[[index]]$split <- splits[index] +junction_extract_index <- function(junction, name) { + as.integer(.subset2(.subset2(junction, "index"), name)) +} + +junction_extract_deps <- function(junction, index) { + if (.subset2(junction, "has_deps")) { + as.character(vctrs::vec_slice(x = .subset2(junction, "deps"), i = index)) + } else { + character(0L) } - out } junction_invalidate <- function(junction) { diff --git a/R/class_pattern.R b/R/class_pattern.R index 5099e816..8b8db30b 100644 --- a/R/class_pattern.R +++ b/R/class_pattern.R @@ -247,27 +247,26 @@ pattern_prepend_branches <- function(target, scheduler) { scheduler$queue$prepend(children, ranks) } +pattern_produce_branch <- function(target, name) { + junction <- .subset2(target, "junction") + index <- junction_extract_index(junction, name) + branch_init( + name = name, + command = .subset2(target, "command"), + deps_parent = .subset2(target, "deps"), + deps_child = junction_extract_deps(junction, index), + settings = .subset2(target, "settings"), + cue = .subset2(target, "cue"), + store = .subset2(target, "store"), + index = index + ) +} + pattern_set_branches <- function(target, pipeline) { - command <- target$command - deps_parent <- target$deps - settings <- target$settings - cue <- target$cue - store <- target$store - specs <- junction_transpose(target$junction) - for (index in seq_along(specs)) { - spec <- .subset2(specs, index) - branch <- branch_init( - name = .subset2(spec, "split"), - command = command, - deps_parent = deps_parent, - deps_child = .subset2(spec, "deps"), - settings = settings, - cue = cue, - store = store, - index = index - ) - pipeline_set_target(pipeline, branch) - } + map( + junction_splits(target$junction), + ~pipeline_set_target(pipeline, pattern_produce_branch(target, .x)) + ) } pattern_insert_branches <- function(target, pipeline, scheduler) { diff --git a/tests/testthat/test-class_junction.R b/tests/testthat/test-class_junction.R index 2ffbf40b..f9c6ea66 100644 --- a/tests/testthat/test-class_junction.R +++ b/tests/testthat/test-class_junction.R @@ -1,8 +1,21 @@ -tar_test("junction deps", { +tar_test("junction with deps", { x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) out <- x$deps exp <- data_frame(a = LETTERS, b = rev(letters)) expect_equal(out, exp) + expect_true(x$has_deps) +}) + +tar_test("junction without deps", { + skip_cran() + x <- junction_init("x", letters, list()) + expect_equal(x$deps, data.frame()) + expect_false(x$has_deps) +}) + +tar_test("junction_length()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_length(x), length(letters)) }) tar_test("junction_splits()", { @@ -10,6 +23,16 @@ tar_test("junction_splits()", { expect_equal(junction_splits(x), letters) }) +tar_test("junction_extract_index()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_extract_index(x, "j"), 10L) +}) + +tar_test("junction_extract_deps()", { + x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) + expect_equal(junction_extract_deps(x, 10L), c("J", "q")) +}) + tar_test("junction_invalidate()", { x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters))) junction_invalidate(x) @@ -26,32 +49,6 @@ tar_test("junction_upstream_edges()", { expect_equal(out, exp) }) -tar_test("junction_transpose() without deps", { - names <- paste0("child_", seq_len(3)) - junction <- junction_init("parent", names) - out <- junction_transpose(junction) - exp <- list(deps = character(0), split = "child_1") - expect_equal(out[[1]], exp) - exp <- list(deps = character(0), split = "child_2") - expect_equal(out[[2]], exp) - exp <- list(deps = character(0), split = "child_3") - expect_equal(out[[3]], exp) -}) - -tar_test("junction transpose() with deps", { - names <- paste0("child_", seq_len(3)) - x <- paste0("x_", seq_len(3)) - y <- paste0("y_", seq_len(3)) - junction <- junction_init("parent", names, list(x, y)) - out <- junction_transpose(junction) - exp <- list(deps = sort(c("x_1", "y_1")), split = "child_1") - expect_equal(out[[1]], exp) - exp <- list(deps = sort(c("x_2", "y_2")), split = "child_2") - expect_equal(out[[2]], exp) - exp <- list(deps = sort(c("x_3", "y_3")), split = "child_3") - expect_equal(out[[3]], exp) -}) - tar_test("junction_validate()", { x <- junction_init("x", letters, list(LETTERS, rev = rev(letters))) expect_silent(junction_validate(x))