From 12c4f7c7510d68dd2b70451befa6ff91eed0ea3c Mon Sep 17 00:00:00 2001 From: wlandau Date: Thu, 7 Nov 2024 14:41:28 -0500 Subject: [PATCH] sketch avoiding cloning the whole store --- NAMESPACE | 1 + R/class_aws.R | 46 +++++++------- R/class_aws_file.R | 22 +++---- R/class_branch.R | 6 +- R/class_builder.R | 42 ++++++++---- R/class_cloud.R | 23 ++++--- R/class_cue.R | 8 +-- R/class_external.R | 4 +- R/class_gcp.R | 46 +++++++------- R/class_gcp_file.R | 22 +++---- R/class_inventory.R | 18 +++--- R/class_inventory_aws.R | 16 ++--- R/class_inventory_gcp.R | 16 ++--- R/class_pattern.R | 6 +- R/class_record.R | 10 ++- R/class_stem.R | 2 +- R/class_store.R | 99 ++++++++++++++++------------- R/class_store_file.R | 25 +++++--- R/class_store_repository_cas.R | 34 +++++----- R/class_store_repository_cas_file.R | 22 +++---- R/class_target.R | 4 +- R/class_url.R | 17 ++--- R/class_verbose.R | 2 +- R/tar_exist_objects.R | 3 +- R/tar_read_raw.R | 4 +- 25 files changed, 277 insertions(+), 221 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 053eaa4e1..a10f4f714 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -207,6 +207,7 @@ S3method(store_update_stage_early,default) S3method(store_update_stage_early,tar_store_file) S3method(store_update_stage_late,default) S3method(store_update_stage_late,tar_store_file) +S3method(store_upload_object,default) S3method(store_upload_object,tar_aws) S3method(store_upload_object,tar_aws_file) S3method(store_upload_object,tar_gcp) diff --git a/R/class_aws.R b/R/class_aws.R index 1439523b5..daf606cda 100644 --- a/R/class_aws.R +++ b/R/class_aws.R @@ -124,8 +124,8 @@ store_aws_split_colon <- function(path) { # external contributors from the open source community. # nocov start #' @export -store_read_object.tar_aws <- function(store) { - path <- store$file$path +store_read_object.tar_aws <- function(store, file) { + path <- file$path key <- store_aws_key(path) bucket <- store_aws_bucket(path) scratch <- path_scratch_temp_network(pattern = basename(store_aws_key(path))) @@ -149,8 +149,8 @@ store_read_object.tar_aws <- function(store) { } #' @export -store_exist_object.tar_aws <- function(store, name = NULL) { - path <- store$file$path +store_exist_object.tar_aws <- function(store, file, name = NULL) { + path <- file$path aws <- store$resources$aws head <- aws_s3_exists( key = store_aws_key(path), @@ -219,23 +219,23 @@ store_delete_objects.tar_aws <- function(store, meta, batch_size, verbose) { } #' @export -store_upload_object.tar_aws <- function(store) { - on.exit(unlink(store$file$stage, recursive = TRUE, force = TRUE)) - store_upload_object_aws(store) +store_upload_object.tar_aws <- function(store, file) { + on.exit(unlink(file$stage, recursive = TRUE, force = TRUE)) + store_upload_object_aws(store, file) } -store_upload_object_aws <- function(store) { - key <- store_aws_key(store$file$path) - bucket <- store_aws_bucket(store$file$path) +store_upload_object_aws <- function(store, file) { + key <- store_aws_key(file$path) + bucket <- store_aws_bucket(file$path) aws <- store$resources$aws head <- if_any( - file_exists_stage(store$file), + file_exists_stage(file), aws_s3_upload( - file = store$file$stage, + file = file$stage, key = key, bucket = bucket, - region = store_aws_region(store$file$path), - endpoint = store_aws_endpoint(store$file$path), + region = store_aws_region(file$path), + endpoint = store_aws_endpoint(file$path), part_size = aws$part_size, args = aws$args, max_tries = aws$max_tries, @@ -245,7 +245,7 @@ store_upload_object_aws <- function(store) { ), tar_throw_file( "Cannot upload non-existent AWS staging file ", - store$file$stage, + file$stage, " to key ", key, ". The target probably encountered an error." @@ -253,25 +253,25 @@ store_upload_object_aws <- function(store) { ) path <- grep( pattern = "^version=", - x = store$file$path, + x = file$path, value = TRUE, invert = TRUE ) - store$file$path <- c(path, paste0("version=", head$VersionId)) - store$file$hash <- hash_object(head$ETag) + file$path <- c(path, paste0("version=", head$VersionId)) + file$hash <- hash_object(head$ETag) invisible() } #' @export -store_has_correct_hash.tar_aws <- function(store) { - hash <- store_aws_hash(store) - !is.null(hash) && identical(hash, store$file$hash) +store_has_correct_hash.tar_aws <- function(store, file) { + hash <- store_aws_hash(store, file) + !is.null(hash) && identical(hash, file$hash) } -store_aws_hash <- function(store) { +store_aws_hash <- function(store, file) { tar_runtime$inventories$aws <- tar_runtime$inventories$aws %|||% inventory_aws_init() - tar_runtime$inventories$aws$get_cache(store = store) + tar_runtime$inventories$aws$get_cache(store = store, file = file) } # nocov end diff --git a/R/class_aws_file.R b/R/class_aws_file.R index 07a540127..9cd85cc22 100644 --- a/R/class_aws_file.R +++ b/R/class_aws_file.R @@ -39,26 +39,26 @@ store_assert_format_setting.aws_file <- function(format) { } #' @export -store_upload_object.tar_aws_file <- function(store) { - store_upload_object_aws(store) +store_upload_object.tar_aws_file <- function(store, file) { + store_upload_object_aws(store, file) } #' @export -store_hash_early.tar_aws_file <- function(store) { # nolint - old <- store$file$path - store$file$path <- store_aws_file_stage(store$file$path) - on.exit(store$file$path <- old) - tar_assert_path(store$file$path) - file_update_info(store$file) +store_hash_early.tar_aws_file <- function(store, file) { # nolint + old <- file$path + file$path <- store_aws_file_stage(file$path) + on.exit(file$path <- old) + tar_assert_path(file$path) + file_update_info(file) } #' @export -store_hash_late.tar_aws_file <- function(store) { # nolint +store_hash_late.tar_aws_file <- function(store, file) { # nolint } #' @export -store_read_object.tar_aws_file <- function(store) { - path <- store$file$path +store_read_object.tar_aws_file <- function(store, file) { + path <- file$path key <- store_aws_key(path) bucket <- store_aws_bucket(path) scratch <- path_scratch_temp_network(pattern = basename(store_aws_key(path))) diff --git a/R/class_branch.R b/R/class_branch.R index f6911ff30..1b9c3a3a2 100644 --- a/R/class_branch.R +++ b/R/class_branch.R @@ -4,10 +4,10 @@ branch_init <- function( deps_parent = character(0L), deps_child = character(0L), settings = NULL, - cue = NULL + cue = NULL, + store = NULL ) { deps <- setdiff(unique(c(deps_parent, deps_child)), settings$dimensions) - store <- settings_produce_store(settings) branch_new( name = name, command = command, @@ -51,7 +51,7 @@ target_get_type.tar_branch <- function(target) { #' @export target_produce_record.tar_branch <- function(target, pipeline, meta) { - file <- target$store$file + file <- target$file record_init( name = target_get_name(target), parent = target_get_parent(target), diff --git a/R/class_builder.R b/R/class_builder.R index 5e6adface..3b186373b 100644 --- a/R/class_builder.R +++ b/R/class_builder.R @@ -41,6 +41,7 @@ target_bootstrap.tar_builder <- function( ) { record <- target_bootstrap_record(target, meta) target$store <- record_bootstrap_store(record) + target$file <- record_bootstrap_file(record) invisible() } @@ -48,7 +49,7 @@ target_bootstrap.tar_builder <- function( target_read_value.tar_builder <- function(target, pipeline = NULL) { command <- target$command load_packages(packages = command$packages, library = command$library) - object <- store_read_object(target$store) + object <- store_read_object(target$store, target$file) iteration <- target$settings$iteration value_init(object, iteration) } @@ -140,7 +141,12 @@ target_run.tar_builder <- function(target, envir, path_store) { builder_ensure_deps(target, target$subpipeline, "worker") frames <- frames_produce(envir, target, target$subpipeline) builder_set_tar_runtime(target, frames) - store_update_stage_early(target$store, target_get_name(target), path_store) + store_update_stage_early( + store = target$store, + file = target$file, + name = target_get_name(target), + path_store = path_store + ) builder_update_build(target, frames_get_envir(frames)) builder_ensure_paths(target, path_store) builder_ensure_object(target, "worker") @@ -184,7 +190,7 @@ target_skip.tar_builder <- function( active ) { target_update_queue(target, scheduler) - file_repopulate(target$store$file, meta$get_record(target_get_name(target))) + file_repopulate(target$file, meta$get_record(target_get_name(target))) if (active) { builder_ensure_workspace( target = target, @@ -230,7 +236,7 @@ target_conclude.tar_builder <- function(target, pipeline, scheduler, meta) { } builder_completed <- function(target, pipeline, scheduler, meta) { - store_cache_path(target$store, target$store$file$path) + store_cache_path(target$store, target$file$path) target_ensure_buds(target, pipeline, scheduler) meta$insert_record(target_produce_record(target, pipeline, meta)) target_patternview_meta(target, pipeline, meta) @@ -459,9 +465,21 @@ builder_ensure_paths <- function(target, path_store) { builder_update_paths <- function(target, path_store) { name <- target_get_name(target) - store_update_path(target$store, name, target$value$object, path_store) - store_update_stage_late(target$store, name, target$value$object, path_store) - store_hash_early(target$store) + store_update_path( + store = target$store, + file = target$file, + name = name, + object = target$value$object, + path_store = path_store + ) + store_update_stage_late( + store = target$store, + file = target$file, + name = name, + object = target$value$object, + path_store = path_store + ) + store_hash_early(target$store, target$file) } builder_unload_value <- function(target) { @@ -475,10 +493,10 @@ builder_unload_value <- function(target) { builder_update_object <- function(target) { on.exit(builder_unload_value(target)) - file_validate_path(target$store$file$path) + file_validate_path(target$file$path) if (!identical(target$settings$storage, "none")) { withCallingHandlers( - store_write_object(target$store, target$value$object), + store_write_object(target$store, target$file, target$value$object), warning = function(condition) { if (length(target$metrics$warnings) < 51L) { target$metrics$warnings <- paste( @@ -491,8 +509,8 @@ builder_update_object <- function(target) { } ) } - store_hash_late(target$store) - store_upload_object(target$store) + store_hash_late(target$store, target$file) + store_upload_object(target$store, target$file) } builder_expect_storage <- function(target) { @@ -532,7 +550,7 @@ builder_ensure_correct_hash <- function(target) { builder_wait_correct_hash <- function(target) { storage <- target$settings$storage deployment <- target$settings$deployment - store_ensure_correct_hash(target$store, storage, deployment) + store_ensure_correct_hash(target$store, target$file, storage, deployment) } builder_set_tar_runtime <- function(target, frames) { diff --git a/R/class_cloud.R b/R/class_cloud.R index 014b0b0d1..40e5f49d1 100644 --- a/R/class_cloud.R +++ b/R/class_cloud.R @@ -5,7 +5,7 @@ # nocov start #' @export store_tar_path.tar_cloud <- function(store, target, path_store) { - store$file$stage + target$file$stage } #' @export @@ -14,8 +14,8 @@ store_produce_stage.tar_cloud <- function(store, name, object, path_store) { } #' @export -store_write_object.tar_cloud <- function(store, object) { - stage <- store$file$stage +store_write_object.tar_cloud <- function(store, file, object) { + stage <- file$stage dir_create(dirname(stage)) store_write_path(store, store_convert_object(store, object), stage) } @@ -25,16 +25,21 @@ store_cache_path.tar_cloud <- function(store, path) { } #' @export -store_hash_late.tar_cloud <- function(store) { - tar_assert_path(store$file$stage) - file <- file_init(path = store$file$stage) +store_hash_late.tar_cloud <- function(store, file) { + tar_assert_path(file$stage) + file <- file_init(path = file$stage) file_update_info(file) - store$file$bytes <- file$bytes - store$file$time <- file$time + file$bytes <- file$bytes + file$time <- file$time } #' @export -store_ensure_correct_hash.tar_cloud <- function(store, storage, deployment) { +store_ensure_correct_hash.tar_cloud <- function( + store, + file, + storage, + deployment +) { } #' @export diff --git a/R/class_cue.R b/R/class_cue.R index 85f900f15..0d8067f38 100644 --- a/R/class_cue.R +++ b/R/class_cue.R @@ -123,7 +123,7 @@ cue_file <- function(cue, target, meta, record) { if (!cue$file) { return(FALSE) } - file_current <- target$store$file + file_current <- target$file file_recorded <- file_new( path = record$path, hash = record$data, @@ -131,9 +131,9 @@ cue_file <- function(cue, target, meta, record) { size = record$size, bytes = record$bytes ) - on.exit(target$store$file <- file_current) - target$store$file <- file_recorded - !store_has_correct_hash(target$store) + on.exit(target$file <- file_current) + target$file <- file_recorded + !store_has_correct_hash(target$store, target$file) } cue_seed <- function(cue, target, meta, record) { diff --git a/R/class_external.R b/R/class_external.R index 18819def0..666687ddf 100644 --- a/R/class_external.R +++ b/R/class_external.R @@ -1,6 +1,6 @@ #' @export -store_row_path.tar_external <- function(store) { - store$file$path +store_row_path.tar_external <- function(store, file) { + file$path } #' @export diff --git a/R/class_gcp.R b/R/class_gcp.R index 6bddb794d..553f743d5 100644 --- a/R/class_gcp.R +++ b/R/class_gcp.R @@ -68,8 +68,8 @@ store_gcp_path_field <- function(path, pattern) { # external contributors from the open source community. # nocov start #' @export -store_read_object.tar_gcp <- function(store) { - path <- store$file$path +store_read_object.tar_gcp <- function(store, file) { + path <- file$path key <- store_gcp_key(path) bucket <- store_gcp_bucket(path) scratch <- path_scratch_temp_network(pattern = basename(store_gcp_key(path))) @@ -87,8 +87,8 @@ store_read_object.tar_gcp <- function(store) { } #' @export -store_exist_object.tar_gcp <- function(store, name = NULL) { - path <- store$file$path +store_exist_object.tar_gcp <- function(store, file, name = NULL) { + path <- file$path gcp_gcs_exists( key = store_gcp_key(path), bucket = store_gcp_bucket(path), @@ -99,8 +99,8 @@ store_exist_object.tar_gcp <- function(store, name = NULL) { } #' @export -store_delete_object.tar_gcp <- function(store, name = NULL) { - path <- store$file$path +store_delete_object.tar_gcp <- function(store, file, name = NULL) { + path <- file$path key <- store_gcp_key(path) bucket <- store_gcp_bucket(path) version <- store_gcp_version(path) @@ -176,18 +176,18 @@ store_delete_objects.tar_gcp <- function(store, meta, batch_size, verbose) { } #' @export -store_upload_object.tar_gcp <- function(store) { - on.exit(unlink(store$file$stage, recursive = TRUE, force = TRUE)) - store_upload_object_gcp(store) +store_upload_object.tar_gcp <- function(store, file) { + on.exit(unlink(file$stage, recursive = TRUE, force = TRUE)) + store_upload_object_gcp(store, file) } -store_upload_object_gcp <- function(store) { - key <- store_gcp_key(store$file$path) - bucket <- store_gcp_bucket(store$file$path) +store_upload_object_gcp <- function(store, file) { + key <- store_gcp_key(file$path) + bucket <- store_gcp_bucket(file$path) head <- if_any( - file_exists_stage(store$file), + file_exists_stage(file), gcp_gcs_upload( - file = store$file$stage, + file = file$stage, key = key, bucket = bucket, predefined_acl = store$resources$gcp$predefined_acl %|||% "private", @@ -196,7 +196,7 @@ store_upload_object_gcp <- function(store) { ), tar_throw_file( "Cannot upload non-existent gcp staging file ", - store$file$stage, + file$stage, " to key ", key, ". The target probably encountered an error." @@ -204,25 +204,25 @@ store_upload_object_gcp <- function(store) { ) path <- grep( pattern = "^version=", - x = store$file$path, + x = file$path, value = TRUE, invert = TRUE ) - store$file$path <- c(path, paste0("version=", head$generation)) - store$file$hash <- hash_object(head$md5) + file$path <- c(path, paste0("version=", head$generation)) + file$hash <- hash_object(head$md5) invisible() } #' @export -store_has_correct_hash.tar_gcp <- function(store) { - hash <- store_gcp_hash(store = store) - !is.null(hash) && identical(hash, store$file$hash) +store_has_correct_hash.tar_gcp <- function(store, file) { + hash <- store_gcp_hash(store = store, file = file) + !is.null(hash) && identical(hash, file$hash) } -store_gcp_hash <- function(store) { +store_gcp_hash <- function(store, file) { tar_runtime$inventories$gcp <- tar_runtime$inventories$gcp %|||% inventory_gcp_init() - tar_runtime$inventories$gcp$get_cache(store = store) + tar_runtime$inventories$gcp$get_cache(store = store, file = file) } # nocov end diff --git a/R/class_gcp_file.R b/R/class_gcp_file.R index d320dea1a..20509f51a 100644 --- a/R/class_gcp_file.R +++ b/R/class_gcp_file.R @@ -29,26 +29,26 @@ store_assert_format_setting.gcp_file <- function(format) { } #' @export -store_upload_object.tar_gcp_file <- function(store) { - store_upload_object_gcp(store) +store_upload_object.tar_gcp_file <- function(store, file) { + store_upload_object_gcp(store, file) } #' @export -store_hash_early.tar_gcp_file <- function(store) { # nolint - old <- store$file$path - store$file$path <- store_gcp_file_stage(store$file$path) - on.exit(store$file$path <- old) - tar_assert_path(store$file$path) - file_update_info(store$file) +store_hash_early.tar_gcp_file <- function(store, file) { # nolint + old <- file$path + file$path <- store_gcp_file_stage(file$path) + on.exit(file$path <- old) + tar_assert_path(file$path) + file_update_info(file) } #' @export -store_hash_late.tar_gcp_file <- function(store) { # nolint +store_hash_late.tar_gcp_file <- function(store, file) { # nolint } #' @export -store_read_object.tar_gcp_file <- function(store) { - path <- store$file$path +store_read_object.tar_gcp_file <- function(store, file) { + path <- file$path key <- store_gcp_key(path) bucket <- store_gcp_bucket(path) scratch <- path_scratch_temp_network(pattern = basename(store_gcp_key(path))) diff --git a/R/class_inventory.R b/R/class_inventory.R index f5bd800a7..e4d04ab89 100644 --- a/R/class_inventory.R +++ b/R/class_inventory.R @@ -22,25 +22,25 @@ inventory_class <- R6::R6Class( prefixes = NULL, misses = NULL, downloads = NULL, - get_key = function(store) { + get_key = function(store, file) { "example_key" }, - get_bucket = function(store) { + get_bucket = function(store, file) { "example_bucket" }, get_name = function(key, bucket) { paste(bucket, key, sep = "|") }, - get_cache = function(store) { - key <- self$get_key(store) + get_cache = function(store, file) { + key <- self$get_key(store, file) prefix <- dirname(key) - bucket <- self$get_bucket(store) + bucket <- self$get_bucket(store, file) name <- self$get_name(key = key, bucket = bucket) miss <- !exists(x = name, envir = self$cache) download <- !counter_exists_name(counter = self$prefixes, name = prefix) if (download) { counter_set_name(counter = self$prefixes, name = prefix) - self$set_cache(store) + self$set_cache(store, file) } self$misses <- self$misses + as.integer(miss) self$downloads <- self$downloads + as.integer(download) @@ -49,9 +49,9 @@ inventory_class <- R6::R6Class( list_cache = function() { names(self$cache) }, - set_cache = function(store) { - key <- self$get_key(store) - bucket <- self$get_bucket(store) + set_cache = function(store, file) { + key <- self$get_key(store, file) + bucket <- self$get_bucket(store, sfile) name <- self$get_name(key = key, bucket = bucket) self$cache[[name]] <- "example_hash" }, diff --git a/R/class_inventory_aws.R b/R/class_inventory_aws.R index 646a23238..fd86cbf29 100644 --- a/R/class_inventory_aws.R +++ b/R/class_inventory_aws.R @@ -17,18 +17,18 @@ inventory_aws_class <- R6::R6Class( portable = FALSE, cloneable = FALSE, public = list( - get_key = function(store) { - store_aws_key(store$file$path) + get_key = function(store, file) { + store_aws_key(file$path) }, - get_bucket = function(store) { - store_aws_bucket(store$file$path) + get_bucket = function(store, file) { + store_aws_bucket(file$path) }, - set_cache = function(store) { - path <- store$file$path - bucket <- self$get_bucket(store) + set_cache = function(store, file) { + path <- file$path + bucket <- self$get_bucket(store, file) aws <- store$resources$aws results <- aws_s3_list_etags( - prefix = dirname(self$get_key(store)), + prefix = dirname(self$get_key(store, file)), bucket = bucket, page_size = aws$page_size, verbose = aws$verbose, diff --git a/R/class_inventory_gcp.R b/R/class_inventory_gcp.R index 3d3d72a9a..9f37de6b2 100644 --- a/R/class_inventory_gcp.R +++ b/R/class_inventory_gcp.R @@ -17,18 +17,18 @@ inventory_gcp_class <- R6::R6Class( portable = FALSE, cloneable = FALSE, public = list( - get_key = function(store) { - store_gcp_key(store$file$path) + get_key = function(store, file) { + store_gcp_key(file$path) }, - get_bucket = function(store) { - store_gcp_bucket(store$file$path) + get_bucket = function(store, file) { + store_gcp_bucket(file$path) }, - set_cache = function(store) { - path <- store$file$path - bucket <- self$get_bucket(store) + set_cache = function(store, file) { + path <- file$path + bucket <- self$get_bucket(store, file) gcp <- store$resources$gcp results <- gcp_gcs_list_md5s( - prefix = dirname(self$get_key(store)), + prefix = dirname(self$get_key(store, file)), bucket = bucket, verbose = gcp$verbose, max_tries = gcp$max_tries diff --git a/R/class_pattern.R b/R/class_pattern.R index 7780922d8..39873c04f 100644 --- a/R/class_pattern.R +++ b/R/class_pattern.R @@ -226,10 +226,11 @@ pattern_prepend_branches <- function(target, scheduler) { pattern_set_branches <- function(target, pipeline) { command <- target$command + deps_parent <- target$deps settings <- target$settings cue <- target$cue + store <- target$store specs <- junction_transpose(target$junction) - deps_parent <- target$deps for (spec in specs) { branch <- branch_init( name = .subset2(spec, "split"), @@ -237,7 +238,8 @@ pattern_set_branches <- function(target, pipeline) { deps_parent = deps_parent, deps_child = .subset2(spec, "deps"), settings = settings, - cue = cue + cue = cue, + store = store ) pipeline_set_target(pipeline, branch) } diff --git a/R/class_record.R b/R/class_record.R index 697946c06..e1b99da75 100644 --- a/R/class_record.R +++ b/R/class_record.R @@ -125,8 +125,7 @@ record_row_path <- function(record) { format = record$format, repository = record$repository ) - store$file$path <- record$path - store_row_path(store) + store_row_path(store, file_init(path = record$path)) } record_from_row <- function(row, path_store) { @@ -162,10 +161,15 @@ record_bootstrap_store <- function(record) { repository = record$repository, resources = tar_options$get_resources() ) - file_repopulate(store$file, record) store } +record_bootstrap_file <- function(record) { + file <- file_init() + file_repopulate(file, record) + file +} + record_validate <- function(record) { tar_assert_correct_fields(record, record_new) tar_assert_chr_no_delim(record$name) diff --git a/R/class_stem.R b/R/class_stem.R index f448cf177..53f9b07fc 100644 --- a/R/class_stem.R +++ b/R/class_stem.R @@ -72,7 +72,7 @@ target_produce_junction.tar_stem <- function(target, pipeline) { #' @export target_produce_record.tar_stem <- function(target, pipeline, meta) { - file <- target$store$file + file <- target$file record_init( name = target_get_name(target), type = "stem", diff --git a/R/class_store.R b/R/class_store.R index 15593ec52..6cda704fd 100644 --- a/R/class_store.R +++ b/R/class_store.R @@ -119,27 +119,27 @@ store_assert_repository_setting.default <- function(repository) { store_assert_repository_setting.local <- function(repository) { } -store_read_object <- function(store) { +store_read_object <- function(store, file) { UseMethod("store_read_object") } #' @export -store_read_object.default <- function(store) { - store_convert_object(store, store_read_path(store, store$file$path)) +store_read_object.default <- function(store, file) { + store_convert_object(store, store_read_path(store, file$path)) } store_read_path <- function(store, path) { UseMethod("store_read_path") } -store_write_object <- function(store, object) { +store_write_object <- function(store, file, object) { UseMethod("store_write_object") } #' @export -store_write_object.default <- function(store, object) { - path <- store$file$path - stage <- store$file$stage +store_write_object.default <- function(store, file, object) { + path <- file$path + stage <- file$stage dir_create_runtime(dirname(path)) dir_create_runtime(dirname(stage)) store_write_path(store, store_convert_object(store, object), stage) @@ -162,23 +162,23 @@ store_cache_path.default <- function(store, path) { } } -store_exist_object <- function(store, name = NULL) { +store_exist_object <- function(store, file, name = NULL) { UseMethod("store_exist_object") } #' @export -store_exist_object.default <- function(store, name = NULL) { - all(file.exists(store$file$path)) +store_exist_object.default <- function(store, file, name = NULL) { + all(file.exists(file$path)) } -store_delete_object <- function(store, name = NULL) { +store_delete_object <- function(store, file, name = NULL) { UseMethod("store_delete_object") } #' @export -store_delete_object.default <- function(store, name = NULL) { - unlink(store$file$path) - unlink(store$file$stage) +store_delete_object.default <- function(store, file, name = NULL) { + unlink(file$path) + unlink(file$stage) } store_delete_objects <- function(store, meta, batch_size, verbose) { @@ -192,15 +192,16 @@ store_delete_objects.default <- function(store, meta, batch_size, verbose) { ) } -store_upload_object <- function(store) { +store_upload_object <- function(store, file) { UseMethod("store_upload_object") } -store_upload_object.default <- function(store) { +#' @export +store_upload_object.default <- function(store, file) { } -store_update_path <- function(store, name, object, path_store) { - store$file$path <- store_produce_path(store, name, object, path_store) +store_update_path <- function(store, file, name, object, path_store) { + file$path <- store_produce_path(store, name, object, path_store) } store_produce_path <- function(store, name, object, path_store) { @@ -212,12 +213,12 @@ store_produce_path.default <- function(store, name, object, path_store) { path_objects(path_store = path_store, name = name) } -store_row_path <- function(store) { +store_row_path <- function(store, file) { UseMethod("store_row_path") } #' @export -store_row_path.default <- function(store) { +store_row_path.default <- function(store, file) { NA_character_ } @@ -239,13 +240,13 @@ store_tar_path.default <- function(store, target, path_store) { path_objects(path_store = path_store, name = target_get_name(target)) } -store_update_stage_early <- function(store, name, path_store) { +store_update_stage_early <- function(store, file, name, path_store) { UseMethod("store_update_stage_early") } #' @export -store_update_stage_early.default <- function(store, name, path_store) { - store$file$stage <- store_produce_stage( +store_update_stage_early.default <- function(store, file, name, path_store) { + file$stage <- store_produce_stage( store = store, name = name, object = NULL, @@ -253,17 +254,23 @@ store_update_stage_early.default <- function(store, name, path_store) { ) } -store_update_stage_late <- function(store, name, object, path_store) { +store_update_stage_late <- function(store, file, name, object, path_store) { UseMethod("store_update_stage_late") } #' @export -store_update_stage_late.default <- function(store, name, object, path_store) { +store_update_stage_late.default <- function( + store, + file, + name, + object, + path_store +) { } #' @export -store_update_stage_early.default <- function(store, name, path_store) { - store$file$stage <- store_produce_stage( +store_update_stage_early.default <- function(store, file, name, path_store) { + file$stage <- store_produce_stage( store = store, name = name, object = NULL, @@ -304,26 +311,27 @@ store_assert_format <- function(store, object, name) { store_assert_format.default <- function(store, object, name) { } -store_hash_early <- function(store) { +store_hash_early <- function(store, file) { UseMethod("store_hash_early") } #' @export -store_hash_early.default <- function(store) { +store_hash_early.default <- function(store, file) { } -store_hash_late <- function(store) { +store_hash_late <- function(store, file) { UseMethod("store_hash_late") } #' @export -store_hash_late.default <- function(store) { - tar_assert_path(store$file$path) - file_update_hash(store$file) +store_hash_late.default <- function(store, file) { + tar_assert_path(file$path) + file_update_hash(file) } store_ensure_correct_hash <- function( store, + file, storage, deployment ) { @@ -331,26 +339,31 @@ store_ensure_correct_hash <- function( } #' @export -store_ensure_correct_hash.default <- function(store, storage, deployment) { +store_ensure_correct_hash.default <- function( + store, + file, + storage, + deployment +) { if (identical(storage, "worker") && identical(deployment, "worker")) { - store_wait_correct_hash(store) + store_wait_correct_hash(store, file) } } -store_wait_correct_hash <- function(store) { +store_wait_correct_hash <- function(store, file) { seconds_interval <- store$resources$network$seconds_interval %|||% 0.25 seconds_timeout <- store$resources$network$seconds_timeout %|||% 60 max_tries <- store$resources$network$max_tries %|||% Inf verbose <- store$resources$network$verbose %|||% TRUE retry_until_true( - fun = ~store_has_correct_hash(store), + fun = ~store_has_correct_hash(store, file), seconds_interval = seconds_interval, seconds_timeout = seconds_timeout, max_tries = max_tries, catch_error = FALSE, message = paste( "Path", - paste(store$file$path, collapse = " "), + paste(file$path, collapse = " "), "does not exist or has incorrect hash.", "File sync timed out." ), @@ -358,14 +371,14 @@ store_wait_correct_hash <- function(store) { ) } -store_has_correct_hash <- function(store) { +store_has_correct_hash <- function(store, file) { UseMethod("store_has_correct_hash") } #' @export -store_has_correct_hash.default <- function(store) { - (all(is.na(store$file$path)) || file_exists_path(store$file)) && - file_has_correct_hash(store$file) +store_has_correct_hash.default <- function(store, file) { + (all(is.na(file$path)) || file_exists_path(file)) && + file_has_correct_hash(file) } store_sync_file_meta <- function(store, target, meta) { @@ -386,7 +399,7 @@ store_sync_file_meta.default <- function(store, target, meta) { size = record$size, bytes = record$bytes ) - info <- file_info_runtime(target$store$file$path) + info <- file_info_runtime(target$file$path) time <- file_time(info) bytes <- file_bytes(info) size <- file_size(bytes) diff --git a/R/class_store_file.R b/R/class_store_file.R index b1acc5dba..937462049 100644 --- a/R/class_store_file.R +++ b/R/class_store_file.R @@ -24,7 +24,7 @@ store_read_path.tar_store_file <- function(store, path) { } #' @export -store_write_object.tar_store_file <- function(store, object) { +store_write_object.tar_store_file <- function(store, file, object) { } #' @export @@ -58,17 +58,23 @@ store_assert_format.tar_store_file <- function(store, object, name) { # nolint } #' @export -store_update_stage_early.tar_store_file <- function(store, name, path_store) { +store_update_stage_early.tar_store_file <- function( + store, + file, + name, + path_store +) { } #' @export store_update_stage_late.tar_store_file <- function( store, + file, name, object, path_store ) { - store$file$stage <- store_produce_stage( + file$stage <- store_produce_stage( store = store, name = name, object = object, @@ -77,24 +83,25 @@ store_update_stage_late.tar_store_file <- function( } #' @export -store_hash_early.tar_store_file <- function(store) { # nolint - tar_assert_path(store$file$path) - file_update_hash(store$file) +store_hash_early.tar_store_file <- function(store, file) { # nolint + tar_assert_path(file$path) + file_update_hash(file) } #' @export -store_hash_late.tar_store_file <- function(store) { # nolint +store_hash_late.tar_store_file <- function(store, file) { # nolint } #' @export store_ensure_correct_hash.tar_store_file <- function( store, + file, storage, deployment ) { if_any( identical(deployment, "worker"), - store_wait_correct_hash(store), - tar_assert_path(store$file$path) + store_wait_correct_hash(store, file), + tar_assert_path(file$path) ) } diff --git a/R/class_store_repository_cas.R b/R/class_store_repository_cas.R index bf4b3da22..16c7194e2 100644 --- a/R/class_store_repository_cas.R +++ b/R/class_store_repository_cas.R @@ -15,25 +15,26 @@ store_assert_repository_setting.repository_cas <- function(repository) { } #' @export -store_hash_early.tar_repository_cas <- function(store) { +store_hash_early.tar_repository_cas <- function(store, file) { } #' @export -store_hash_late.tar_repository_cas <- function(store) { - tar_assert_file(store$file$stage) - path <- store$file$path - on.exit(store$file$path <- path) - store$file$path <- store$file$stage - file_update_hash(store$file) +store_hash_late.tar_repository_cas <- function(store, file) { + tar_assert_file(file$stage) + path <- file$path + on.exit(file$path <- path) + file$path <- file$stage + file_update_hash(file) } #' @export -store_upload_object.tar_repository_cas <- function(store) { - store_upload_object_cas(store, store$file$stage) +store_upload_object.tar_repository_cas <- function(store, file) { + store_upload_object_cas(store, file) } -store_upload_object_cas <- function(store, path) { - on.exit(unlink(store$file$stage, recursive = TRUE, force = TRUE)) +store_upload_object_cas <- function(store, file) { + path <- file$stage + on.exit(unlink(path, recursive = TRUE, force = TRUE)) tar_assert_scalar( path, msg = paste( @@ -41,7 +42,7 @@ store_upload_object_cas <- function(store, path) { "a single file or single directory." ) ) - key <- store$file$hash + key <- file$hash lookup <- tar_repository_cas_lookup(store) if (lookup_missing(lookup, key) || !lookup_get(lookup, key)) { store_repository_cas_call_method( @@ -54,20 +55,20 @@ store_upload_object_cas <- function(store, path) { } #' @export -store_read_object.tar_repository_cas <- function(store) { +store_read_object.tar_repository_cas <- function(store, file) { scratch <- path_scratch_temp_network() dir_create(dirname(scratch)) on.exit(unlink(scratch)) store_repository_cas_call_method( store = store, text = store$methods_repository$download, - args = list(key = store$file$hash, path = scratch) + args = list(key = file$hash, path = scratch) ) store_convert_object(store, store_read_path(store, scratch)) } #' @export -store_has_correct_hash.tar_repository_cas <- function(store) { +store_has_correct_hash.tar_repository_cas <- function(store, file) { lookup <- tar_repository_cas_lookup(store) key <- .subset2(.subset2(store, "file"), "hash") if (lookup_missing(lookup = lookup, name = key)) { @@ -84,11 +85,12 @@ store_has_correct_hash.tar_repository_cas <- function(store) { #' @export store_ensure_correct_hash.tar_repository_cas <- function( store, + file, storage, deployment ) { if (!store$methods_repository$consistent) { - store_wait_correct_hash(store) + store_wait_correct_hash(store, file) } } diff --git a/R/class_store_repository_cas_file.R b/R/class_store_repository_cas_file.R index 35dfd8db6..0240651de 100644 --- a/R/class_store_repository_cas_file.R +++ b/R/class_store_repository_cas_file.R @@ -1,31 +1,31 @@ #' @export -store_hash_early.tar_repository_cas_file <- function(store) { - store_hash_early.tar_store_file(store) +store_hash_early.tar_repository_cas_file <- function(store, file) { + store_hash_early.tar_store_file(store, file) } #' @export -store_hash_late.tar_repository_cas_file <- function(store) { - store_hash_late.tar_store_file(store) +store_hash_late.tar_repository_cas_file <- function(store, file) { + store_hash_late.tar_store_file(store, file) } #' @export -store_upload_object.tar_repository_cas_file <- function(store) { - store_upload_object_cas(store, store$file$path) +store_upload_object.tar_repository_cas_file <- function(store, file) { + store_upload_object_cas(store, file) } #' @export -store_read_object.tar_repository_cas_file <- function(store) { +store_read_object.tar_repository_cas_file <- function(store, file) { scratch <- path_scratch_temp_network() dir_create(dirname(scratch)) on.exit(unlink(scratch)) store_repository_cas_call_method( store = store, text = store$methods_repository$download, - args = list(path = scratch, key = store$file$hash) + args = list(path = scratch, key = file$hash) ) - dir_create(dirname(store$file$hash)) - file_move(from = scratch, to = store$file$path) - store$file$path + dir_create(dirname(file$hash)) + file_move(from = scratch, to = file$path) + file$path } #' @export diff --git a/R/class_target.R b/R/class_target.R index 39d6e5bbc..2f76aa154 100644 --- a/R/class_target.R +++ b/R/class_target.R @@ -468,10 +468,10 @@ target_allow_meta <- function(target) { } target_reformat <- function(target, format) { - file <- target$store$file + file <- target$file target$settings$format <- format target$store <- settings_produce_store(target$settings) - target$store$file <- file + target$file <- file } target_validate <- function(target) { diff --git a/R/class_url.R b/R/class_url.R index b402e04d6..6e0545a74 100644 --- a/R/class_url.R +++ b/R/class_url.R @@ -16,7 +16,7 @@ store_read_path.tar_url <- function(store, path) { } #' @export -store_write_object.tar_url <- function(store, object) { +store_write_object.tar_url <- function(store, file, object) { } #' @export @@ -45,14 +45,14 @@ store_assert_format.tar_url <- function(store, object, name) { } #' @export -store_hash_early.tar_url <- function(store) { # nolint +store_hash_early.tar_url <- function(store, file) { # nolint handle <- store$resources$url$handle %|||% store$resources$handle max_tries <- store$resources$url$max_tries %|||% 5L seconds_interval <- store$resources$url$seconds_interval %|||% 1 seconds_timeout <- store$resources$url$seconds_timeout %|||% 60 verbose <- store$resources$url$verbose %|||% TRUE - store$file$hash <- url_hash( - url = store$file$path, + file$hash <- url_hash( + url = file$path, handle = handle, seconds_interval = seconds_interval, seconds_timeout = seconds_timeout, @@ -62,12 +62,13 @@ store_hash_early.tar_url <- function(store) { # nolint } #' @export -store_hash_late.tar_url <- function(store) { # nolint +store_hash_late.tar_url <- function(store, file) { # nolint } #' @export store_ensure_correct_hash.tar_url <- function( # nolint store, + file, storage, deployment ) { @@ -78,18 +79,18 @@ store_sync_file_meta.tar_url <- function(store, target, meta) { } #' @export -store_has_correct_hash.tar_url <- function(store) { +store_has_correct_hash.tar_url <- function(store, file) { handle <- store$resources$url$handle %|||% store$resources$handle identical( url_hash( - url = store$file$path, + url = file$path, handle = handle, seconds_interval = 0, seconds_timeout = 0, max_tries = 1L, verbose = TRUE ), - store$file$hash + file$hash ) } diff --git a/R/class_verbose.R b/R/class_verbose.R index 2725c8386..97b276e4e 100644 --- a/R/class_verbose.R +++ b/R/class_verbose.R @@ -35,7 +35,7 @@ verbose_class <- R6::R6Class( name = target_get_name(target), prefix = target_get_type_cli(target), seconds_elapsed = target$metrics$seconds, - bytes_storage = target$store$file$bytes, + bytes_storage = target$file$bytes, print = FALSE ) ) diff --git a/R/tar_exist_objects.R b/R/tar_exist_objects.R index 4087e6643..bc1a392b3 100644 --- a/R/tar_exist_objects.R +++ b/R/tar_exist_objects.R @@ -54,6 +54,7 @@ tar_exist_cloud_target <- function(name, meta, path_store) { row <- meta[meta$name == name,, drop = FALSE] # nolint record <- record_from_row(row = row, path_store = path_store) store <- record_bootstrap_store(record) - store_exist_object(store = store, name = name) + file <- record_bootstrap_file(file) + store_exist_object(store = store, file = file, name = name) } # nocov end diff --git a/R/tar_read_raw.R b/R/tar_read_raw.R index d081366a3..711ecc044 100644 --- a/R/tar_read_raw.R +++ b/R/tar_read_raw.R @@ -31,7 +31,9 @@ tar_read_inner <- function(name, branches, meta, path_store) { } read_builder <- function(record) { - store_read_object(record_bootstrap_store(record)) + store <- record_bootstrap_store(record) + file <- record_bootstrap_file(record) + store_read_object(store, file) } read_pattern <- function(name, record, meta, branches, path_store) {