Replies: 6 comments
-
Cool idea! As of 52657ea, you can aggregate with DuckDB in a downstream target. Sketch: library(targets)
library(tarchetypes)
scenarios <- tidyr::expand_grid(
tibble::tibble(
n1 = c(20, 30, 50),
n2 = c(40, 30, 50)
),
delta = c(0.2, 0.4, 0.6)
)
generate_data <- function(n1, n2, delta) {
tibble::tibble(
id = seq_len(n1 + n2),
group = rep(seq_len(2), c(n1, n2)),
y = rnorm(sum(n1 + n2), mean = dplyr::if_else(group == 1, 0, delta))
)
}
analyze_data <- function(data, design_parameter1, design_paramter2) {
t.test(y ~ group, data = data) |>
broom::tidy()
}
aggregate_upstream_branches <- function() {
# Get the names of all upstream branches
# while making sure not to load detritus leftover in _targets/objects/:
upstream <- targets::tar_definition()$subpipeline$targets
classes <- purrr::map_chr(as.list(upstream), ~class(.x)[1L])
branches <- names(upstream)[classes != "tar_pattern"]
# Robustly get the destination file:
objects <- file.path(targets::tar_path_store(), "objects")
destination <- file.path(objects, targets::tar_name())
# Build the query. This query can be quite long since it does not
# use a grep pattern, and I hope this does not cut down on efficiency,
# but it is important because we want to make sure we select
# only the correct branches.
base <- "COPY (SELECT * FROM read_parquet([%s])) TO '%s' (FORMAT 'parquet')"
paths <- sprintf("'%s'", file.path(objects, branches))
query <- sprintf(base, paste(paths, collapse = ","), destination)
# Run the query and make sure the connection closes on exit
# (even in case of errors).
connection <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
on.exit(DBI::dbDisconnect(connection))
DBI::dbExecute(connection, query)
}
results <- tar_map_rep(
name = results,
command = generate_data(n1, n2, delta) |>
analyze_data(),
values = scenarios,
names = everything(),
batches = 10,
reps = 10,
format = tar_format_nanoparquet(),
memory = "transient",
garbage_collection = 100,
combine = FALSE
)
list(
results,
tar_target_raw( # Allow the `deps` argument.
name = "combined",
command = quote(aggregate_upstream_branches()),
# List all dependencies manually.
deps = c(names(results$static_branches), "aggregate_upstream_branches"),
# The tidyverse team really likes nanoparquet,
# although it does not support lists.
format = tar_format_nanoparquet(),
# Let aggregate_upstream_branches() retreive the upstream dependency data:
retrieval = "none",
# Let aggregate_upstream_branches() store the output data:
storage = "none",
)
) This is all a bit low-level, so target factories for aggregation will be powerful. Target factories will help people grab efficient aggregation tools of the shelf instead of reinventing the wheel using advanced knowledge of I supplied all 900 branch names literally in the query instead of using a regex. This may make the query less efficient (hopefully not by much) but it is important because detritus from previous runs of the pipeline can be leftover in With the new |
Beta Was this translation helpful? Give feedback.
-
Hey Will, thanks for the pointers - a welcome reason for me to dig deeper into {targets} :) One issue with storing the individual batches in a db right away would be concurrent writing from many processes. Not really what {duckdb} seems to be optimized for. Maybe requires a different db. Maybe that's the even better approach - modifying Will explore a bit more how your approach scales. A couple of hundred / low four digit scenarios should be covered for it to be practical. Beyond that one probably needa a database as storage. |
Beta Was this translation helpful? Give feedback.
-
I would like to see how far we can take |
Beta Was this translation helpful? Give feedback.
-
Edit: due to recent changes in development Ultimately, I think it will be nicer on many levels to aggregate at the level of each scenario instead of across scenarios. |
Beta Was this translation helpful? Give feedback.
-
Converting to a discussion for |
Beta Was this translation helpful? Give feedback.
-
Prework
Proposal
tar_map_rep()
is very handy for large scale statistical simulations. However, with 100s of scenarios and 100k+ reps per scenario the resulting data can become quite big and the combination of the individual batches becomes slow.An alternative would be to use {duckdb} to combine the individual targets (using parquet?) this can be much faster and can automatically buffer to hard disk if memory is running out.
Removing
combine = FALSE
results in about 20s for aggregation and 14Gb of memory usage.With {duckdb} only 3Gb of memory are used and the aggregation is almost instantaneous (to be fair, the results are not written to disk, but using {duckdb} instead of {duckplyr} this should not be an issue and there is no need to load the full data into memory at any point)
Memory usage is very low ~350 Mb.
You could even use {duckdb} for downstream processing and aggregation although {dplyr} still seems to have an edge here (2 vCPUs).
I would argue that the main advantage of using {duckdb} is the more robust combination of results (doesn't crash and is faster). One could even consider using a native .duckdb file as format instead of going via parquet.
Beta Was this translation helpful? Give feedback.
All reactions