Skip to content

Commit

Permalink
eliminate junction_transpose()
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 8, 2024
1 parent 32e9d0d commit 19f4a7d
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 61 deletions.
40 changes: 26 additions & 14 deletions R/class_junction.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,52 @@ junction_init <- function(
splits = character(0),
deps = list()
) {
splits <- make.unique(splits, sep = "_")
index <- seq_along(splits)
names(index) <- make.unique(splits, sep = "_")
names(deps) <- names(deps) %|||% seq_along(deps)
names(index) <- splits
deps <- as_data_frame(deps)
junction_new(nexus, index, deps)
has_deps <- nrow(deps) > 0L
junction_new(nexus, index, deps, has_deps)
}

junction_new <- function(nexus = NULL, index = NULL, deps = NULL) {
junction_new <- function(
nexus = NULL,
index = NULL,
deps = NULL,
has_deps = NULL
) {
out <- new.env(parent = emptyenv(), hash = FALSE)
out$nexus <- nexus
out$index <- index
out$deps <- deps
out$has_deps <- has_deps
out
}

junction_upstream_edges <- function(junction) {
from <- utils::stack(junction$deps)$values
from <- unlist(junction$deps, use.names = FALSE)
to <- rep(junction_splits(junction), times = ncol(junction$deps))
data_frame(from = from, to = to)
}

junction_length <- function(junction) {
length(.subset2(junction, "index"))
}

junction_splits <- function(junction) {
names(junction$index)
names(.subset2(junction, "index"))
}

junction_transpose <- function(junction) {
splits <- junction_splits(junction)
deps <- junction$deps
out <- map_rows(deps, ~list(deps = unname(.x))) %||%
replicate(length(splits), list(deps = character(0)), simplify = FALSE)
for (index in seq_along(splits)) {
out[[index]]$split <- splits[index]
junction_extract_index <- function(junction, name) {
as.integer(.subset2(.subset2(junction, "index"), name))
}

junction_extract_deps <- function(junction, index) {
if (.subset2(junction, "has_deps")) {
as.character(vctrs::vec_slice(x = .subset2(junction, "deps"), i = index))
} else {
character(0L)
}
out
}

junction_invalidate <- function(junction) {
Expand Down
39 changes: 19 additions & 20 deletions R/class_pattern.R
Original file line number Diff line number Diff line change
Expand Up @@ -247,27 +247,26 @@ pattern_prepend_branches <- function(target, scheduler) {
scheduler$queue$prepend(children, ranks)
}

pattern_produce_branch <- function(target, name) {
junction <- .subset2(target, "junction")
index <- junction_extract_index(junction, name)
branch_init(
name = name,
command = .subset2(target, "command"),
deps_parent = .subset2(target, "deps"),
deps_child = junction_extract_deps(junction, index),
settings = .subset2(target, "settings"),
cue = .subset2(target, "cue"),
store = .subset2(target, "store"),
index = index
)
}

pattern_set_branches <- function(target, pipeline) {
command <- target$command
deps_parent <- target$deps
settings <- target$settings
cue <- target$cue
store <- target$store
specs <- junction_transpose(target$junction)
for (index in seq_along(specs)) {
spec <- .subset2(specs, index)
branch <- branch_init(
name = .subset2(spec, "split"),
command = command,
deps_parent = deps_parent,
deps_child = .subset2(spec, "deps"),
settings = settings,
cue = cue,
store = store,
index = index
)
pipeline_set_target(pipeline, branch)
}
map(
junction_splits(target$junction),
~pipeline_set_target(pipeline, pattern_produce_branch(target, .x))
)
}

pattern_insert_branches <- function(target, pipeline, scheduler) {
Expand Down
51 changes: 24 additions & 27 deletions tests/testthat/test-class_junction.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
tar_test("junction deps", {
tar_test("junction with deps", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
out <- x$deps
exp <- data_frame(a = LETTERS, b = rev(letters))
expect_equal(out, exp)
expect_true(x$has_deps)
})

tar_test("junction without deps", {
skip_cran()
x <- junction_init("x", letters, list())
expect_equal(x$deps, data.frame())
expect_false(x$has_deps)
})

tar_test("junction_length()", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
expect_equal(junction_length(x), length(letters))
})

tar_test("junction_splits()", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
expect_equal(junction_splits(x), letters)
})

tar_test("junction_extract_index()", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
expect_equal(junction_extract_index(x, "j"), 10L)
})

tar_test("junction_extract_deps()", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
expect_equal(junction_extract_deps(x, 10L), c("J", "q"))
})

tar_test("junction_invalidate()", {
x <- junction_init("x", letters, list(a = LETTERS, b = rev(letters)))
junction_invalidate(x)
Expand All @@ -26,32 +49,6 @@ tar_test("junction_upstream_edges()", {
expect_equal(out, exp)
})

tar_test("junction_transpose() without deps", {
names <- paste0("child_", seq_len(3))
junction <- junction_init("parent", names)
out <- junction_transpose(junction)
exp <- list(deps = character(0), split = "child_1")
expect_equal(out[[1]], exp)
exp <- list(deps = character(0), split = "child_2")
expect_equal(out[[2]], exp)
exp <- list(deps = character(0), split = "child_3")
expect_equal(out[[3]], exp)
})

tar_test("junction transpose() with deps", {
names <- paste0("child_", seq_len(3))
x <- paste0("x_", seq_len(3))
y <- paste0("y_", seq_len(3))
junction <- junction_init("parent", names, list(x, y))
out <- junction_transpose(junction)
exp <- list(deps = sort(c("x_1", "y_1")), split = "child_1")
expect_equal(out[[1]], exp)
exp <- list(deps = sort(c("x_2", "y_2")), split = "child_2")
expect_equal(out[[2]], exp)
exp <- list(deps = sort(c("x_3", "y_3")), split = "child_3")
expect_equal(out[[3]], exp)
})

tar_test("junction_validate()", {
x <- junction_init("x", letters, list(LETTERS, rev = rev(letters)))
expect_silent(junction_validate(x))
Expand Down

0 comments on commit 19f4a7d

Please sign in to comment.