Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use {duckdb} to speed up aggregation of large tar_map_rep() patterns #201

Closed
3 tasks done
kkmann opened this issue Nov 2, 2024 · 5 comments
Closed
3 tasks done
Assignees

Comments

@kkmann
Copy link

kkmann commented Nov 2, 2024

Prework

  • I understand and agree to help guide.
  • I understand and agree to contributing guide.
  • New features take time and effort to create, and they take even more effort to maintain. So if the purpose of the feature is to resolve a struggle you are encountering personally, please consider first posting a "trouble" or "other" issue so we can discuss your use case and search for existing solutions first.

Proposal

tar_map_rep() is very handy for large scale statistical simulations. However, with 100s of scenarios and 100k+ reps per scenario the resulting data can become quite big and the combination of the individual batches becomes slow.

An alternative would be to use {duckdb} to combine the individual targets (using parquet?) this can be much faster and can automatically buffer to hard disk if memory is running out.

library(targets)
library(tarchetypes)
library(crew)

tar_option_set(
  controller = crew.cluster::crew_controller_slurm(
    workers = 300,
    slurm_cpus_per_task = 1,
    slurm_memory_gigabytes_per_cpu = 4
  )
)

get_scenarios <- function() {
  tidyr::expand_grid(
    n = c("c(20, 40)", "c(30, 30)", "c(50, 50)"), # avoid issues with non-atomic data types
    delta = c(0.2, 0.4, 0.6)
  )
}

generate_data <- function(n, delta) {
  tibble::tibble(
    id = 1:sum(n),
    group = rep(seq_along(n), n),
    y = rnorm(sum(n), mean = dplyr::if_else(group == 1, 0, delta))
  )
}

analyze_data <- function(data, design_parameter1, design_paramter2) {
  t.test(y ~ group, data = data) |>
    broom::tidy()
}

list(

  tar_target(
      name = tbl_scenarios,
      command = get_scenarios()
    ),

  tar_map_rep(
      name = tbl_results,
      command = {
        generate_data(eval(parse(text = n)), delta) |>
          analyze_data()
      },
      values = get_scenarios(),
      batches = 100, reps = 10000,
      format = "parquet", memory = "transient", garbage_collection = 100,
      combine = FALSE # skip default combination of individual batches
    )

)

Removing combine = FALSE results in about 20s for aggregation and 14Gb of memory usage.
With {duckdb} only 3Gb of memory are used and the aggregation is almost instantaneous (to be fair, the results are not written to disk, but using {duckdb} instead of {duckplyr} this should not be an issue and there is no need to load the full data into memory at any point)

# targets::tar_make(reporter = "summary", as_job = TRUE)

targets::tar_delete(tbl_results_batch) # messes up regex x/

combine_and_save <- function() {
  con <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
  DBI::dbExecute(con, "COPY (SELECT * FROM read_parquet('_targets/objects/tbl_results_*')) TO '_targets/objects/tbl_results' (FORMAT 'parquet')")
}

microbenchmark::microbenchmark(combine_and_save(), times = 5, unit = "seconds")
Unit: seconds
               expr      min       lq     mean   median       uq      max neval
 combine_and_save() 6.993747 6.997318 7.242503 7.234357 7.431151 7.555944     5

Memory usage is very low ~350 Mb.

You could even use {duckdb} for downstream processing and aggregation although {dplyr} still seems to have an edge here (2 vCPUs).

# unfortunately {duckplyr} does not translate the mean() of a boolean...
calculate_power_duckdb <- function() {
  con <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
  DBI::dbExecute(con, "CREATE TABLE tbl_results AS SELECT * FROM read_parquet('_targets/objects/tbl_results')")
  DBI::dbGetQuery(con, "
    SELECT
      n,
      delta,
      AVG(CASE WHEN \"p.value\" <= 0.05 THEN 1 ELSE 0 END) AS power
    FROM
      tbl_results
    GROUP BY
      n,
      delta;
  ")
}

calculate_power_dplyr <- function() {
  tbl_results <- arrow::read_parquet("_targets/objects/tbl_results")
  dplyr::summarize(tbl_results, power = mean(p.value < 0.05), .by = c(n, delta))
}

microbenchmark::microbenchmark(
  calculate_power_duckdb(),
  calculate_power_dplyr(),
  times = 5, unit = "seconds"
)
Unit: seconds
                     expr      min       lq     mean   median       uq      max neval cld
 calculate_power_duckdb() 2.974546 3.039805 3.241417 3.040516 3.175640 3.976579     5   a
  calculate_power_dplyr() 1.555537 1.769337 2.292620 1.910074 2.281365 3.946787     5   a

I would argue that the main advantage of using {duckdb} is the more robust combination of results (doesn't crash and is faster). One could even consider using a native .duckdb file as format instead of going via parquet.

@wlandau
Copy link
Member

wlandau commented Nov 4, 2024

Cool idea! As of 52657ea, you can aggregate with DuckDB in a downstream target. Sketch:

library(targets)
library(tarchetypes)

scenarios <- tidyr::expand_grid(
  tibble::tibble(
    n1 = c(20, 30, 50),
    n2 = c(40, 30, 50)
  ),
  delta = c(0.2, 0.4, 0.6)
)

generate_data <- function(n1, n2, delta) {
  tibble::tibble(
    id = seq_len(n1 + n2),
    group = rep(seq_len(2), c(n1, n2)),
    y = rnorm(sum(n1 + n2), mean = dplyr::if_else(group == 1, 0, delta))
  )
}

analyze_data <- function(data, design_parameter1, design_paramter2) {
  t.test(y ~ group, data = data) |>
    broom::tidy()
}

aggregate_upstream_branches <- function() {
  # Get the names of all upstream branches
  # while making sure not to load detritus leftover in _targets/objects/:
  upstream <- targets::tar_definition()$subpipeline$targets
  classes <- purrr::map_chr(as.list(upstream), ~class(.x)[1L])
  branches <- names(upstream)[classes != "tar_pattern"]
  # Robustly get the destination file:
  objects <- file.path(targets::tar_path_store(), "objects")
  destination <- file.path(objects, targets::tar_name())
  # Build the query. This query can be quite long since it does not
  # use a grep pattern, and I hope this does not cut down on efficiency,
  # but it is important because we want to make sure we select
  # only the correct branches.
  base <- "COPY (SELECT * FROM read_parquet([%s])) TO '%s' (FORMAT 'parquet')"
  paths <- sprintf("'%s'", file.path(objects, branches))
  query <- sprintf(base, paste(paths, collapse = ","), destination)
  # Run the query and make sure the connection closes on exit
  # (even in case of errors).
  connection <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
  on.exit(DBI::dbDisconnect(connection))
  DBI::dbExecute(connection, query)
}

results <- tar_map_rep(
  name = results,
  command = generate_data(n1, n2, delta) |>
    analyze_data(),
  values = scenarios,
  names = everything(),
  batches = 10,
  reps = 10,
  format = tar_format_nanoparquet(),
  memory = "transient",
  garbage_collection = 100,
  combine = FALSE
)

list(
  results,
  tar_target_raw( # Allow the `deps` argument.
    name = "combined",
    command = quote(aggregate_upstream_branches()),
    # List all dependencies manually.
    deps = c(names(results$static_branches), "aggregate_upstream_branches"),
    # The tidyverse team really likes nanoparquet,
    # although it does not support lists.
    format = tar_format_nanoparquet(),
    # Let aggregate_upstream_branches() retreive the upstream dependency data:
    retrieval = "none",
    # Let aggregate_upstream_branches() store the output data:
    storage = "none",
  )
)

This is all a bit low-level, so target factories for aggregation will be powerful. Target factories will help people grab efficient aggregation tools of the shelf instead of reinventing the wheel using advanced knowledge of targets.

I supplied all 900 branch names literally in the query instead of using a regex. This may make the query less efficient (hopefully not by much) but it is important because detritus from previous runs of the pipeline can be leftover in _targets/objects/ and we don't want to load that.

With the new targets::tar_repository_cas() function, it should even be possible to define a DuckDB database as a storage repository.

@kkmann
Copy link
Author

kkmann commented Nov 5, 2024

Hey Will,

thanks for the pointers - a welcome reason for me to dig deeper into {targets} :)

One issue with storing the individual batches in a db right away would be concurrent writing from many processes. Not really what {duckdb} seems to be optimized for. Maybe requires a different db. Maybe that's the even better approach - modifying tar_map_rep() such that it directly writes to a dbbackend that suports parallel writing? Sounds a lot more complex though.

Will explore a bit more how your approach scales. A couple of hundred / low four digit scenarios should be covered for it to be practical. Beyond that one probably needa a database as storage.

@wlandau
Copy link
Member

wlandau commented Nov 5, 2024

I would like to see how far we can take tar_repository_cas() to integrate with databases and aggregate dynamic branches efficiently. My hope is that the ultimate solution will look like tar_map_rep(repository = tar_repository_cas(...)) while allowing tar_map_rep() to stay fully general.

@wlandau
Copy link
Member

wlandau commented Nov 11, 2024

Edit: due to recent changes in development targets (c.f. ropensci/targets#1370) the above technique will need branches <- names(upstream)[classes != "tar_pattern"] instead of branches <- names(upstream)[classes == "tar_branch"]. I updated #201 (comment) to reflect this.

Ultimately, I think it will be nicer on many levels to aggregate at the level of each scenario instead of across scenarios.

@wlandau
Copy link
Member

wlandau commented Nov 21, 2024

Converting to a discussion for tarchetypes. I think I'll want to first try handling this at the level of dynamic branch aggregation in targets.

@ropensci ropensci locked and limited conversation to collaborators Nov 21, 2024
@wlandau wlandau converted this issue into discussion #204 Nov 21, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Projects
None yet
Development

No branches or pull requests

2 participants