-
Notifications
You must be signed in to change notification settings - Fork 19
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use {duckdb} to speed up aggregation of large tar_map_rep() patterns #201
Comments
Cool idea! As of 52657ea, you can aggregate with DuckDB in a downstream target. Sketch: library(targets)
library(tarchetypes)
scenarios <- tidyr::expand_grid(
tibble::tibble(
n1 = c(20, 30, 50),
n2 = c(40, 30, 50)
),
delta = c(0.2, 0.4, 0.6)
)
generate_data <- function(n1, n2, delta) {
tibble::tibble(
id = seq_len(n1 + n2),
group = rep(seq_len(2), c(n1, n2)),
y = rnorm(sum(n1 + n2), mean = dplyr::if_else(group == 1, 0, delta))
)
}
analyze_data <- function(data, design_parameter1, design_paramter2) {
t.test(y ~ group, data = data) |>
broom::tidy()
}
aggregate_upstream_branches <- function() {
# Get the names of all upstream branches
# while making sure not to load detritus leftover in _targets/objects/:
upstream <- targets::tar_definition()$subpipeline$targets
classes <- purrr::map_chr(as.list(upstream), ~class(.x)[1L])
branches <- names(upstream)[classes != "tar_pattern"]
# Robustly get the destination file:
objects <- file.path(targets::tar_path_store(), "objects")
destination <- file.path(objects, targets::tar_name())
# Build the query. This query can be quite long since it does not
# use a grep pattern, and I hope this does not cut down on efficiency,
# but it is important because we want to make sure we select
# only the correct branches.
base <- "COPY (SELECT * FROM read_parquet([%s])) TO '%s' (FORMAT 'parquet')"
paths <- sprintf("'%s'", file.path(objects, branches))
query <- sprintf(base, paste(paths, collapse = ","), destination)
# Run the query and make sure the connection closes on exit
# (even in case of errors).
connection <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
on.exit(DBI::dbDisconnect(connection))
DBI::dbExecute(connection, query)
}
results <- tar_map_rep(
name = results,
command = generate_data(n1, n2, delta) |>
analyze_data(),
values = scenarios,
names = everything(),
batches = 10,
reps = 10,
format = tar_format_nanoparquet(),
memory = "transient",
garbage_collection = 100,
combine = FALSE
)
list(
results,
tar_target_raw( # Allow the `deps` argument.
name = "combined",
command = quote(aggregate_upstream_branches()),
# List all dependencies manually.
deps = c(names(results$static_branches), "aggregate_upstream_branches"),
# The tidyverse team really likes nanoparquet,
# although it does not support lists.
format = tar_format_nanoparquet(),
# Let aggregate_upstream_branches() retreive the upstream dependency data:
retrieval = "none",
# Let aggregate_upstream_branches() store the output data:
storage = "none",
)
) This is all a bit low-level, so target factories for aggregation will be powerful. Target factories will help people grab efficient aggregation tools of the shelf instead of reinventing the wheel using advanced knowledge of I supplied all 900 branch names literally in the query instead of using a regex. This may make the query less efficient (hopefully not by much) but it is important because detritus from previous runs of the pipeline can be leftover in With the new |
Hey Will, thanks for the pointers - a welcome reason for me to dig deeper into {targets} :) One issue with storing the individual batches in a db right away would be concurrent writing from many processes. Not really what {duckdb} seems to be optimized for. Maybe requires a different db. Maybe that's the even better approach - modifying Will explore a bit more how your approach scales. A couple of hundred / low four digit scenarios should be covered for it to be practical. Beyond that one probably needa a database as storage. |
I would like to see how far we can take |
Edit: due to recent changes in development Ultimately, I think it will be nicer on many levels to aggregate at the level of each scenario instead of across scenarios. |
Converting to a discussion for |
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
Prework
Proposal
tar_map_rep()
is very handy for large scale statistical simulations. However, with 100s of scenarios and 100k+ reps per scenario the resulting data can become quite big and the combination of the individual batches becomes slow.An alternative would be to use {duckdb} to combine the individual targets (using parquet?) this can be much faster and can automatically buffer to hard disk if memory is running out.
Removing
combine = FALSE
results in about 20s for aggregation and 14Gb of memory usage.With {duckdb} only 3Gb of memory are used and the aggregation is almost instantaneous (to be fair, the results are not written to disk, but using {duckdb} instead of {duckplyr} this should not be an issue and there is no need to load the full data into memory at any point)
Memory usage is very low ~350 Mb.
You could even use {duckdb} for downstream processing and aggregation although {dplyr} still seems to have an edge here (2 vCPUs).
I would argue that the main advantage of using {duckdb} is the more robust combination of results (doesn't crash and is faster). One could even consider using a native .duckdb file as format instead of going via parquet.
The text was updated successfully, but these errors were encountered: