Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Super fast checking of cloud targets #1181

Merged
merged 50 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
de94707
docs and fixes
wlandau-lilly Nov 7, 2023
51e34a2
Add utility aws_s3_list_etags()
wlandau-lilly Nov 9, 2023
f89d25b
Test aws_list_etags()
wlandau-lilly Nov 9, 2023
38d6e3c
Add gcp_gcs_list_md5s()
wlandau-lilly Nov 9, 2023
de970fe
Switch to ETags for AWS S3 targets
wlandau-lilly Nov 10, 2023
ab200be
news
wlandau-lilly Nov 10, 2023
e889619
Use MD5s for gcp targets
wlandau-lilly Nov 10, 2023
63cd359
More cloud resource fields
wlandau-lilly Nov 10, 2023
f5d378f
merge
wlandau-lilly Nov 10, 2023
8bc4851
news
wlandau-lilly Nov 10, 2023
cdda2a8
store_aws_version_use()
wlandau-lilly Nov 10, 2023
f18a297
store_gcp_version_use()
wlandau-lilly Nov 10, 2023
c20b4b5
Align on cloud versioning policy
wlandau-lilly Nov 10, 2023
d32d927
inventory class
wlandau-lilly Nov 10, 2023
cfee2e3
cache more nuanced inventory
wlandau-lilly Nov 10, 2023
fce1c78
reorder methods
wlandau-lilly Nov 10, 2023
10b8f29
rename a method
wlandau-lilly Nov 10, 2023
d5fe537
Update abstract inventory class test
wlandau-lilly Nov 10, 2023
d6d532a
Sketch AWS inventory class
wlandau-lilly Nov 10, 2023
0749a82
add aws inventory class (sketch)
wlandau-lilly Nov 10, 2023
f95eb3f
Only get version once
wlandau-lilly Nov 10, 2023
f3d0e91
test aws inventory class
wlandau-lilly Nov 10, 2023
0e0ad09
coverage
wlandau-lilly Nov 10, 2023
6bc7bf1
Fix tests
wlandau-lilly Nov 10, 2023
f1a2acd
revert version setting
wlandau-lilly Nov 13, 2023
1ba46b1
revert some tests
wlandau-lilly Nov 13, 2023
297a95e
migrate tests
wlandau-lilly Nov 13, 2023
8f0691b
redesign abstract inventory
wlandau-lilly Nov 13, 2023
7923ea8
redesign aws inventory
wlandau-lilly Nov 13, 2023
84266b8
Rewrite and test AWS inventory
wlandau-lilly Nov 13, 2023
b05392b
gcp inventory
wlandau-lilly Nov 13, 2023
a358a68
gcp inventory
wlandau-lilly Nov 13, 2023
85f84f4
Bring back tar_unversion()
wlandau-lilly Nov 13, 2023
bc1d444
merge
wlandau-lilly Nov 13, 2023
8882518
news and docs
wlandau-lilly Nov 13, 2023
e28a40f
test inventories list in tar_runtime
wlandau-lilly Nov 13, 2023
12e6f95
Clean up callr utils and populate inventory list
wlandau-lilly Nov 13, 2023
6a655f1
Un-export hidden functions
wlandau-lilly Nov 13, 2023
8b977a3
paws.common
wlandau-lilly Nov 13, 2023
5c84cbb
docs
wlandau-lilly Nov 13, 2023
5253243
Avoid local file hashes in cloud targets
wlandau-lilly Nov 13, 2023
8827ba7
Avoid duplicate hashes for cloud files
wlandau-lilly Nov 13, 2023
b32fd8a
paws.common
wlandau-lilly Nov 13, 2023
87fd352
Download conditional on prefix
wlandau-lilly Nov 13, 2023
3622688
Same with gcp inventories
wlandau-lilly Nov 13, 2023
370aca6
Use inventories in AWS targets
wlandau-lilly Nov 13, 2023
9db9e10
Update test
wlandau-lilly Nov 13, 2023
5604ac8
fix tests
wlandau-lilly Nov 13, 2023
21f5730
Fix #1172
wlandau-lilly Nov 13, 2023
eea7a33
lint
wlandau-lilly Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.3.2.9002
Version: 1.3.2.9003
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down Expand Up @@ -92,6 +92,7 @@ Suggests:
nanonext (>= 0.9.0),
rmarkdown (>= 2.4),
parallelly (>= 1.35.0),
paws.common (>= 0.5.4),
paws.storage (>= 0.2.0),
pingr (>= 2.0.1),
pkgload (>= 1.1.0),
Expand Down
13 changes: 5 additions & 8 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ S3method(hash_object,character)
S3method(hash_object,default)
S3method(imports_init,default)
S3method(imports_init,tar_imports)
S3method(pipeline_from_list,default)
S3method(pipeline_from_list,tar_pipeline)
S3method(print,tar_cue)
S3method(print,tar_pattern)
S3method(print,tar_pipeline)
Expand Down Expand Up @@ -61,6 +63,7 @@ S3method(store_assert_repository_setting,aws)
S3method(store_assert_repository_setting,default)
S3method(store_assert_repository_setting,gcp)
S3method(store_assert_repository_setting,local)
S3method(store_cache_path,default)
S3method(store_cache_path,tar_cloud)
S3method(store_cache_path,tar_external)
S3method(store_class_format,feather)
Expand Down Expand Up @@ -95,9 +98,7 @@ S3method(store_delete_objects,default)
S3method(store_delete_objects,tar_aws)
S3method(store_delete_objects,tar_gcp)
S3method(store_ensure_correct_hash,default)
S3method(store_ensure_correct_hash,tar_aws)
S3method(store_ensure_correct_hash,tar_cloud)
S3method(store_ensure_correct_hash,tar_gcp)
S3method(store_ensure_correct_hash,tar_store_file)
S3method(store_ensure_correct_hash,tar_url)
S3method(store_exist_object,default)
Expand Down Expand Up @@ -125,7 +126,9 @@ S3method(store_hash_early,tar_gcp_file)
S3method(store_hash_early,tar_store_file)
S3method(store_hash_early,tar_url)
S3method(store_hash_late,default)
S3method(store_hash_late,tar_aws_file)
S3method(store_hash_late,tar_cloud)
S3method(store_hash_late,tar_gcp_file)
S3method(store_hash_late,tar_store_file)
S3method(store_hash_late,tar_url)
S3method(store_marshal_object,default)
Expand Down Expand Up @@ -208,8 +211,6 @@ S3method(store_write_path,tar_store_custom)
S3method(store_write_path,tar_store_file)
S3method(store_write_path,tar_torch)
S3method(store_write_path,tar_url)
S3method(tar_as_pipeline,default)
S3method(tar_as_pipeline,tar_pipeline)
S3method(tar_make_interactive_load_target,tar_bud)
S3method(tar_make_interactive_load_target,tar_target)
S3method(target_bootstrap,tar_builder)
Expand Down Expand Up @@ -316,7 +317,6 @@ export(rstudio_addin_tar_target)
export(rstudio_addin_tar_visnetwork)
export(starts_with)
export(tar_active)
export(tar_as_pipeline)
export(tar_assert_chr)
export(tar_assert_dbl)
export(tar_assert_df)
Expand Down Expand Up @@ -437,16 +437,13 @@ export(tar_option_reset)
export(tar_option_set)
export(tar_outdated)
export(tar_path)
export(tar_path_objects_dir)
export(tar_path_objects_dir_cloud)
export(tar_path_script)
export(tar_path_script_support)
export(tar_path_store)
export(tar_path_target)
export(tar_pattern)
export(tar_pid)
export(tar_pipeline)
export(tar_pipeline_validate_lite)
export(tar_poll)
export(tar_print)
export(tar_process)
Expand Down
18 changes: 14 additions & 4 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
# targets 1.3.2.9002 (development)
# targets 1.3.2.9003 (development)

## Invalidating changes

Because of the changes below, upgrading to this version of `targets` will unavoidably invalidate previously built targets in existing pipelines. Your pipeline code should still work, but any targets you ran before will most likely need to rerun after the upgrade.

* Use SHA512 during the creation of target-specific pseudo-random number generator seeds (#1139). This change decreases the risk of overlapping/correlated random number generator streams. See the "RNG overlap" section of the `tar_seed_create()` help file for details and justification. Unfortunately, this change will invalidate all currently built targets because the seeds will be different. To avoid rerunning your whole pipeline, set `cue = tar_cue(seed = FALSE)` in `tar_target()`.
* For cloud storage: instead of the hash of the local file, use the ETag for AWS S3 targets and the MD5 hash for GCP GCS targets (#1172). Sanitize with `targets:::digest_chr64()` in both cases before storing the result in the metadata.
* For a cloud target to be truly up to date, the hash in the metadata now needs to match the *current* object in the bucket, not the version recorded in the metadata (#1172). In other words, `targets` now tries to ensure that the up-to-date data objects in the cloud are in their newest versions. So if you roll back the metadata to an older version, you will still be able to access historical data versions with e.g. `tar_read()`, but the pipeline will no longer be up to date.

## Other changes
## Other changes to seeds

* Add a new exported function `tar_seed_create()` which creates target-specific pseudo-random number generator seeds.
* Add an "RNG overlap" section in the `tar_seed_create()` help file to justify and defend how `targets` and `tarchetypes` approach pseudo-random numbers.
* Add function `tar_seed_set()` which sets a seed and sets all the RNG algorithms to their defaults in the R installation of the user. Each target now uses `tar_seed_set()` function to set its seed before running its R command (#1139).
* Deprecate `tar_seed()` in favor of the new `tar_seed_get()` function.
* Migrate to the changes in `clustermq` 0.9.0 (@mschubert).
* Add a new `tar_unversion()` function to remove version IDs from the metadata of cloud targets. This makes it easier to remove all versions of target data using functions `tar_destroy()` and `tar_delete()`.

## Other cloud storage improvements

* For all cloud targets, check hashes in batched LIST requests instead of individual HEAD requests (#1172). Dramatically speeds up the process of checking if cloud targets are up to date.
* For AWS S3 targets, `tar_delete()`, `tar_destroy()`, and `tar_prune()` now use efficient batched calls to `delete_objects()` instead of costly individual calls to `delete_object()` (#1171).
* Add a new `verbose` argument to `tar_delete()`, `tar_destroy()`, and `tar_prune()`.
* Add a new `batch_size` argument to `tar_delete()`, `tar_destroy()`, and `tar_prune()`.
* Add new arguments `page_size` and `verbose` to `tar_resources_aws()` (#1172).
* Add a new `tar_unversion()` function to remove version IDs from the metadata of cloud targets. This makes it easier to interact with just the current version of each target, as opposed to the version ID recorded in the local metadata.

## Other changes

* Migrate to the changes in `clustermq` 0.9.0 (@mschubert).

# targets 1.3.2

Expand Down
31 changes: 9 additions & 22 deletions R/class_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ store_delete_objects.tar_aws <- function(store, meta, batch_size, verbose) {
endpoint <- store_aws_endpoint(example_path)
objects <- map(
subset$path,
~list(Key = store_aws_key(.x), VersionId = store_aws_version(.x))
~list(
Key = store_aws_key(.x),
VersionId = store_aws_version(.x)
)
)
message <- paste(
"could not delete one or more objects from AWS bucket %s.",
Expand Down Expand Up @@ -224,7 +227,6 @@ store_upload_object_aws <- function(store) {
bucket = bucket,
region = store_aws_region(store$file$path),
endpoint = store_aws_endpoint(store$file$path),
metadata = list("targets-hash" = store$file$hash),
part_size = aws$part_size,
args = aws$args,
max_tries = aws$max_tries,
Expand All @@ -247,39 +249,24 @@ store_upload_object_aws <- function(store) {
invert = TRUE
)
store$file$path <- c(path, paste0("version=", head$VersionId))
store$file$hash <- digest_chr64(head$ETag)
invisible()
}

#' @export
store_ensure_correct_hash.tar_aws <- function(store, storage, deployment) {
}

#' @export
store_has_correct_hash.tar_aws <- function(store) {
hash <- store_aws_hash(store)
!is.null(hash) && identical(hash, store$file$hash)
}

store_aws_hash <- function(store) {
path <- store$file$path
aws <- store$resources$aws
head <- aws_s3_head(
key = store_aws_key(path),
bucket = store_aws_bucket(path),
region = store_aws_region(path),
endpoint = store_aws_endpoint(path),
version = store_aws_version(path),
args = aws$args,
max_tries = aws$max_tries,
seconds_timeout = aws$seconds_timeout,
close_connection = aws$close_connection,
s3_force_path_style = aws$s3_force_path_style
)
head$Metadata[["targets-hash"]]
tar_runtime$inventories$aws <- tar_runtime$inventories$aws %|||%
inventory_aws_init()
tar_runtime$inventories$aws$get_cache(store = store)
}
# nocov end

#' @export
store_get_packages.tar_aws <- function(store) {
c("paws.storage", NextMethod())
c("paws.common", "paws.storage", NextMethod())
}
6 changes: 5 additions & 1 deletion R/class_aws_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ store_hash_early.tar_aws_file <- function(store) { # nolint
store$file$path <- store_aws_file_stage(store$file$path)
on.exit(store$file$path <- old)
tar_assert_path(store$file$path)
file_update_hash(store$file)
file_update_info(store$file)
}

#' @export
store_hash_late.tar_aws_file <- function(store) { # nolint
}

#' @export
Expand Down
4 changes: 1 addition & 3 deletions R/class_cloud.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ store_cache_path.tar_cloud <- function(store, path) {
store_hash_late.tar_cloud <- function(store) {
tar_assert_path(store$file$stage)
file <- file_init(path = store$file$stage)
file_update_hash(file)
store$file$hash <- file$hash
file_update_info(file)
store$file$bytes <- file$bytes
store$file$time <- file$time
}

#' @export
store_ensure_correct_hash.tar_cloud <- function(store, storage, deployment) {
store_wait_correct_hash(store)
}

#' @export
Expand Down
9 changes: 9 additions & 0 deletions R/class_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ file_exists_stage <- function(file) {
all(file.exists(file$stage))
}

file_update_info <- function(file) {
files <- file_list_files(file$path)
info <- file_info(files)
file$time <- file_time(info)
file$bytes <- file_bytes(info)
file$size <- file_size(file$bytes)
invisible()
}

file_update_hash <- function(file) {
files <- file_list_files(file$path)
info <- file_info(files)
Expand Down
18 changes: 4 additions & 14 deletions R/class_gcp.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ store_upload_object_gcp <- function(store) {
file = store$file$stage,
key = key,
bucket = bucket,
metadata = list("targets-hash" = store$file$hash),
predefined_acl = store$resources$gcp$predefined_acl %|||% "private",
verbose = store$resources$gcp$verbose %|||% FALSE,
max_tries = store$resources$gcp$max_tries %|||% 5L
Expand All @@ -203,29 +202,20 @@ store_upload_object_gcp <- function(store) {
invert = TRUE
)
store$file$path <- c(path, paste0("version=", head$generation))
store$file$hash <- digest_chr64(head$md5)
invisible()
}

#' @export
store_ensure_correct_hash.tar_gcp <- function(store, storage, deployment) {
}

#' @export
store_has_correct_hash.tar_gcp <- function(store) {
hash <- store_gcp_hash(store = store)
!is.null(hash) && identical(hash, store$file$hash)
}

store_gcp_hash <- function(store) {
path <- store$file$path
head <- gcp_gcs_head(
key = store_gcp_key(path),
bucket = store_gcp_bucket(path),
version = store_gcp_version(path),
verbose = store$resources$gcp$verbose %|||% FALSE,
max_tries = store$resources$gcp$max_tries %|||% 5L
)
head$metadata[["targets-hash"]]
tar_runtime$inventories$gcp <- tar_runtime$inventories$gcp %|||%
inventory_gcp_init()
tar_runtime$inventories$gcp$get_cache(store = store)
}
# nocov end

Expand Down
6 changes: 5 additions & 1 deletion R/class_gcp_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ store_hash_early.tar_gcp_file <- function(store) { # nolint
store$file$path <- store_gcp_file_stage(store$file$path)
on.exit(store$file$path <- old)
tar_assert_path(store$file$path)
file_update_hash(store$file)
file_update_info(store$file)
}

#' @export
store_hash_late.tar_gcp_file <- function(store) { # nolint
}

#' @export
Expand Down
69 changes: 69 additions & 0 deletions R/class_inventory.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# This is an abstract inventory. The definitions of
# get_key(), get_bucket(), and set_cache() below
# are abstract and for testing only. Only the subclasses
# have serious versions of these methods.
inventory_init <- function() {
out <- inventory_new()
out$reset()
out
}

inventory_new <- function() {
inventory_class$new()
}

inventory_class <- R6::R6Class(
classname = "tar_inventory",
class = FALSE,
portable = FALSE,
cloneable = FALSE,
public = list(
cache = NULL,
prefixes = NULL,
misses = NULL,
downloads = NULL,
get_key = function(store) {
"example_key"
},
get_bucket = function(store) {
"example_bucket"
},
get_name = function(key, bucket) {
paste(bucket, key, sep = "|")
},
get_cache = function(store) {
key <- self$get_key(store)
prefix <- dirname(key)
bucket <- self$get_bucket(store)
name <- self$get_name(key = key, bucket = bucket)
miss <- !exists(x = name, envir = self$cache)
download <- !counter_exists_name(counter = self$prefixes, name = prefix)
if (download) {
counter_set_name(counter = self$prefixes, name = prefix)
self$set_cache(store)
}
self$misses <- self$misses + as.integer(miss)
self$downloads <- self$downloads + as.integer(download)
self$cache[[name]]
},
list_cache = function() {
names(self$cache)
},
set_cache = function(store) {
key <- self$get_key(store)
bucket <- self$get_bucket(store)
name <- self$get_name(key = key, bucket = bucket)
self$cache[[name]] <- "example_hash"
},
reset = function() {
self$cache <- new.env(parent = emptyenv())
self$prefixes <- counter_init()
self$misses <- 0L
self$downloads <- 0L
},
validate = function() {
tar_assert_envir(self$cache)
invisible()
}
)
)
Loading
Loading