Skip to content

Commit

Permalink
Merge pull request #1181 from ropensci/1172
Browse files Browse the repository at this point in the history
Super fast checking of cloud targets
  • Loading branch information
wlandau authored Nov 13, 2023
2 parents 13470ef + eea7a33 commit 4bc6db4
Show file tree
Hide file tree
Showing 67 changed files with 1,105 additions and 358 deletions.
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.3.2.9002
Version: 1.3.2.9003
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down Expand Up @@ -92,6 +92,7 @@ Suggests:
nanonext (>= 0.9.0),
rmarkdown (>= 2.4),
parallelly (>= 1.35.0),
paws.common (>= 0.5.4),
paws.storage (>= 0.2.0),
pingr (>= 2.0.1),
pkgload (>= 1.1.0),
Expand Down
13 changes: 5 additions & 8 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ S3method(hash_object,character)
S3method(hash_object,default)
S3method(imports_init,default)
S3method(imports_init,tar_imports)
S3method(pipeline_from_list,default)
S3method(pipeline_from_list,tar_pipeline)
S3method(print,tar_cue)
S3method(print,tar_pattern)
S3method(print,tar_pipeline)
Expand Down Expand Up @@ -61,6 +63,7 @@ S3method(store_assert_repository_setting,aws)
S3method(store_assert_repository_setting,default)
S3method(store_assert_repository_setting,gcp)
S3method(store_assert_repository_setting,local)
S3method(store_cache_path,default)
S3method(store_cache_path,tar_cloud)
S3method(store_cache_path,tar_external)
S3method(store_class_format,feather)
Expand Down Expand Up @@ -95,9 +98,7 @@ S3method(store_delete_objects,default)
S3method(store_delete_objects,tar_aws)
S3method(store_delete_objects,tar_gcp)
S3method(store_ensure_correct_hash,default)
S3method(store_ensure_correct_hash,tar_aws)
S3method(store_ensure_correct_hash,tar_cloud)
S3method(store_ensure_correct_hash,tar_gcp)
S3method(store_ensure_correct_hash,tar_store_file)
S3method(store_ensure_correct_hash,tar_url)
S3method(store_exist_object,default)
Expand Down Expand Up @@ -125,7 +126,9 @@ S3method(store_hash_early,tar_gcp_file)
S3method(store_hash_early,tar_store_file)
S3method(store_hash_early,tar_url)
S3method(store_hash_late,default)
S3method(store_hash_late,tar_aws_file)
S3method(store_hash_late,tar_cloud)
S3method(store_hash_late,tar_gcp_file)
S3method(store_hash_late,tar_store_file)
S3method(store_hash_late,tar_url)
S3method(store_marshal_object,default)
Expand Down Expand Up @@ -208,8 +211,6 @@ S3method(store_write_path,tar_store_custom)
S3method(store_write_path,tar_store_file)
S3method(store_write_path,tar_torch)
S3method(store_write_path,tar_url)
S3method(tar_as_pipeline,default)
S3method(tar_as_pipeline,tar_pipeline)
S3method(tar_make_interactive_load_target,tar_bud)
S3method(tar_make_interactive_load_target,tar_target)
S3method(target_bootstrap,tar_builder)
Expand Down Expand Up @@ -316,7 +317,6 @@ export(rstudio_addin_tar_target)
export(rstudio_addin_tar_visnetwork)
export(starts_with)
export(tar_active)
export(tar_as_pipeline)
export(tar_assert_chr)
export(tar_assert_dbl)
export(tar_assert_df)
Expand Down Expand Up @@ -437,16 +437,13 @@ export(tar_option_reset)
export(tar_option_set)
export(tar_outdated)
export(tar_path)
export(tar_path_objects_dir)
export(tar_path_objects_dir_cloud)
export(tar_path_script)
export(tar_path_script_support)
export(tar_path_store)
export(tar_path_target)
export(tar_pattern)
export(tar_pid)
export(tar_pipeline)
export(tar_pipeline_validate_lite)
export(tar_poll)
export(tar_print)
export(tar_process)
Expand Down
18 changes: 14 additions & 4 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
# targets 1.3.2.9002 (development)
# targets 1.3.2.9003 (development)

## Invalidating changes

Because of the changes below, upgrading to this version of `targets` will unavoidably invalidate previously built targets in existing pipelines. Your pipeline code should still work, but any targets you ran before will most likely need to rerun after the upgrade.

* Use SHA512 during the creation of target-specific pseudo-random number generator seeds (#1139). This change decreases the risk of overlapping/correlated random number generator streams. See the "RNG overlap" section of the `tar_seed_create()` help file for details and justification. Unfortunately, this change will invalidate all currently built targets because the seeds will be different. To avoid rerunning your whole pipeline, set `cue = tar_cue(seed = FALSE)` in `tar_target()`.
* For cloud storage: instead of the hash of the local file, use the ETag for AWS S3 targets and the MD5 hash for GCP GCS targets (#1172). Sanitize with `targets:::digest_chr64()` in both cases before storing the result in the metadata.
* For a cloud target to be truly up to date, the hash in the metadata now needs to match the *current* object in the bucket, not the version recorded in the metadata (#1172). In other words, `targets` now tries to ensure that the up-to-date data objects in the cloud are in their newest versions. So if you roll back the metadata to an older version, you will still be able to access historical data versions with e.g. `tar_read()`, but the pipeline will no longer be up to date.

## Other changes
## Other changes to seeds

* Add a new exported function `tar_seed_create()` which creates target-specific pseudo-random number generator seeds.
* Add an "RNG overlap" section in the `tar_seed_create()` help file to justify and defend how `targets` and `tarchetypes` approach pseudo-random numbers.
* Add function `tar_seed_set()` which sets a seed and sets all the RNG algorithms to their defaults in the R installation of the user. Each target now uses `tar_seed_set()` function to set its seed before running its R command (#1139).
* Deprecate `tar_seed()` in favor of the new `tar_seed_get()` function.
* Migrate to the changes in `clustermq` 0.9.0 (@mschubert).
* Add a new `tar_unversion()` function to remove version IDs from the metadata of cloud targets. This makes it easier to remove all versions of target data using functions `tar_destroy()` and `tar_delete()`.

## Other cloud storage improvements

* For all cloud targets, check hashes in batched LIST requests instead of individual HEAD requests (#1172). Dramatically speeds up the process of checking if cloud targets are up to date.
* For AWS S3 targets, `tar_delete()`, `tar_destroy()`, and `tar_prune()` now use efficient batched calls to `delete_objects()` instead of costly individual calls to `delete_object()` (#1171).
* Add a new `verbose` argument to `tar_delete()`, `tar_destroy()`, and `tar_prune()`.
* Add a new `batch_size` argument to `tar_delete()`, `tar_destroy()`, and `tar_prune()`.
* Add new arguments `page_size` and `verbose` to `tar_resources_aws()` (#1172).
* Add a new `tar_unversion()` function to remove version IDs from the metadata of cloud targets. This makes it easier to interact with just the current version of each target, as opposed to the version ID recorded in the local metadata.

## Other changes

* Migrate to the changes in `clustermq` 0.9.0 (@mschubert).

# targets 1.3.2

Expand Down
31 changes: 9 additions & 22 deletions R/class_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ store_delete_objects.tar_aws <- function(store, meta, batch_size, verbose) {
endpoint <- store_aws_endpoint(example_path)
objects <- map(
subset$path,
~list(Key = store_aws_key(.x), VersionId = store_aws_version(.x))
~list(
Key = store_aws_key(.x),
VersionId = store_aws_version(.x)
)
)
message <- paste(
"could not delete one or more objects from AWS bucket %s.",
Expand Down Expand Up @@ -224,7 +227,6 @@ store_upload_object_aws <- function(store) {
bucket = bucket,
region = store_aws_region(store$file$path),
endpoint = store_aws_endpoint(store$file$path),
metadata = list("targets-hash" = store$file$hash),
part_size = aws$part_size,
args = aws$args,
max_tries = aws$max_tries,
Expand All @@ -247,39 +249,24 @@ store_upload_object_aws <- function(store) {
invert = TRUE
)
store$file$path <- c(path, paste0("version=", head$VersionId))
store$file$hash <- digest_chr64(head$ETag)
invisible()
}

#' @export
store_ensure_correct_hash.tar_aws <- function(store, storage, deployment) {
}

#' @export
store_has_correct_hash.tar_aws <- function(store) {
hash <- store_aws_hash(store)
!is.null(hash) && identical(hash, store$file$hash)
}

store_aws_hash <- function(store) {
path <- store$file$path
aws <- store$resources$aws
head <- aws_s3_head(
key = store_aws_key(path),
bucket = store_aws_bucket(path),
region = store_aws_region(path),
endpoint = store_aws_endpoint(path),
version = store_aws_version(path),
args = aws$args,
max_tries = aws$max_tries,
seconds_timeout = aws$seconds_timeout,
close_connection = aws$close_connection,
s3_force_path_style = aws$s3_force_path_style
)
head$Metadata[["targets-hash"]]
tar_runtime$inventories$aws <- tar_runtime$inventories$aws %|||%
inventory_aws_init()
tar_runtime$inventories$aws$get_cache(store = store)
}
# nocov end

#' @export
store_get_packages.tar_aws <- function(store) {
c("paws.storage", NextMethod())
c("paws.common", "paws.storage", NextMethod())
}
6 changes: 5 additions & 1 deletion R/class_aws_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ store_hash_early.tar_aws_file <- function(store) { # nolint
store$file$path <- store_aws_file_stage(store$file$path)
on.exit(store$file$path <- old)
tar_assert_path(store$file$path)
file_update_hash(store$file)
file_update_info(store$file)
}

#' @export
store_hash_late.tar_aws_file <- function(store) { # nolint
}

#' @export
Expand Down
4 changes: 1 addition & 3 deletions R/class_cloud.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ store_cache_path.tar_cloud <- function(store, path) {
store_hash_late.tar_cloud <- function(store) {
tar_assert_path(store$file$stage)
file <- file_init(path = store$file$stage)
file_update_hash(file)
store$file$hash <- file$hash
file_update_info(file)
store$file$bytes <- file$bytes
store$file$time <- file$time
}

#' @export
store_ensure_correct_hash.tar_cloud <- function(store, storage, deployment) {
store_wait_correct_hash(store)
}

#' @export
Expand Down
9 changes: 9 additions & 0 deletions R/class_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ file_exists_stage <- function(file) {
all(file.exists(file$stage))
}

file_update_info <- function(file) {
files <- file_list_files(file$path)
info <- file_info(files)
file$time <- file_time(info)
file$bytes <- file_bytes(info)
file$size <- file_size(file$bytes)
invisible()
}

file_update_hash <- function(file) {
files <- file_list_files(file$path)
info <- file_info(files)
Expand Down
18 changes: 4 additions & 14 deletions R/class_gcp.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ store_upload_object_gcp <- function(store) {
file = store$file$stage,
key = key,
bucket = bucket,
metadata = list("targets-hash" = store$file$hash),
predefined_acl = store$resources$gcp$predefined_acl %|||% "private",
verbose = store$resources$gcp$verbose %|||% FALSE,
max_tries = store$resources$gcp$max_tries %|||% 5L
Expand All @@ -203,29 +202,20 @@ store_upload_object_gcp <- function(store) {
invert = TRUE
)
store$file$path <- c(path, paste0("version=", head$generation))
store$file$hash <- digest_chr64(head$md5)
invisible()
}

#' @export
store_ensure_correct_hash.tar_gcp <- function(store, storage, deployment) {
}

#' @export
store_has_correct_hash.tar_gcp <- function(store) {
hash <- store_gcp_hash(store = store)
!is.null(hash) && identical(hash, store$file$hash)
}

store_gcp_hash <- function(store) {
path <- store$file$path
head <- gcp_gcs_head(
key = store_gcp_key(path),
bucket = store_gcp_bucket(path),
version = store_gcp_version(path),
verbose = store$resources$gcp$verbose %|||% FALSE,
max_tries = store$resources$gcp$max_tries %|||% 5L
)
head$metadata[["targets-hash"]]
tar_runtime$inventories$gcp <- tar_runtime$inventories$gcp %|||%
inventory_gcp_init()
tar_runtime$inventories$gcp$get_cache(store = store)
}
# nocov end

Expand Down
6 changes: 5 additions & 1 deletion R/class_gcp_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ store_hash_early.tar_gcp_file <- function(store) { # nolint
store$file$path <- store_gcp_file_stage(store$file$path)
on.exit(store$file$path <- old)
tar_assert_path(store$file$path)
file_update_hash(store$file)
file_update_info(store$file)
}

#' @export
store_hash_late.tar_gcp_file <- function(store) { # nolint
}

#' @export
Expand Down
69 changes: 69 additions & 0 deletions R/class_inventory.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# This is an abstract inventory. The definitions of
# get_key(), get_bucket(), and set_cache() below
# are abstract and for testing only. Only the subclasses
# have serious versions of these methods.
inventory_init <- function() {
out <- inventory_new()
out$reset()
out
}

inventory_new <- function() {
inventory_class$new()
}

inventory_class <- R6::R6Class(
classname = "tar_inventory",
class = FALSE,
portable = FALSE,
cloneable = FALSE,
public = list(
cache = NULL,
prefixes = NULL,
misses = NULL,
downloads = NULL,
get_key = function(store) {
"example_key"
},
get_bucket = function(store) {
"example_bucket"
},
get_name = function(key, bucket) {
paste(bucket, key, sep = "|")
},
get_cache = function(store) {
key <- self$get_key(store)
prefix <- dirname(key)
bucket <- self$get_bucket(store)
name <- self$get_name(key = key, bucket = bucket)
miss <- !exists(x = name, envir = self$cache)
download <- !counter_exists_name(counter = self$prefixes, name = prefix)
if (download) {
counter_set_name(counter = self$prefixes, name = prefix)
self$set_cache(store)
}
self$misses <- self$misses + as.integer(miss)
self$downloads <- self$downloads + as.integer(download)
self$cache[[name]]
},
list_cache = function() {
names(self$cache)
},
set_cache = function(store) {
key <- self$get_key(store)
bucket <- self$get_bucket(store)
name <- self$get_name(key = key, bucket = bucket)
self$cache[[name]] <- "example_hash"
},
reset = function() {
self$cache <- new.env(parent = emptyenv())
self$prefixes <- counter_init()
self$misses <- 0L
self$downloads <- 0L
},
validate = function() {
tar_assert_envir(self$cache)
invisible()
}
)
)
Loading

0 comments on commit 4bc6db4

Please sign in to comment.