Skip to content

Commit

Permalink
sketch avoiding cloning the whole store
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 7, 2024
1 parent 36dde3c commit 12c4f7c
Show file tree
Hide file tree
Showing 25 changed files with 277 additions and 221 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ S3method(store_update_stage_early,default)
S3method(store_update_stage_early,tar_store_file)
S3method(store_update_stage_late,default)
S3method(store_update_stage_late,tar_store_file)
S3method(store_upload_object,default)
S3method(store_upload_object,tar_aws)
S3method(store_upload_object,tar_aws_file)
S3method(store_upload_object,tar_gcp)
Expand Down
46 changes: 23 additions & 23 deletions R/class_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ store_aws_split_colon <- function(path) {
# external contributors from the open source community.
# nocov start
#' @export
store_read_object.tar_aws <- function(store) {
path <- store$file$path
store_read_object.tar_aws <- function(store, file) {
path <- file$path
key <- store_aws_key(path)
bucket <- store_aws_bucket(path)
scratch <- path_scratch_temp_network(pattern = basename(store_aws_key(path)))
Expand All @@ -149,8 +149,8 @@ store_read_object.tar_aws <- function(store) {
}

#' @export
store_exist_object.tar_aws <- function(store, name = NULL) {
path <- store$file$path
store_exist_object.tar_aws <- function(store, file, name = NULL) {
path <- file$path
aws <- store$resources$aws
head <- aws_s3_exists(
key = store_aws_key(path),
Expand Down Expand Up @@ -219,23 +219,23 @@ store_delete_objects.tar_aws <- function(store, meta, batch_size, verbose) {
}

#' @export
store_upload_object.tar_aws <- function(store) {
on.exit(unlink(store$file$stage, recursive = TRUE, force = TRUE))
store_upload_object_aws(store)
store_upload_object.tar_aws <- function(store, file) {
on.exit(unlink(file$stage, recursive = TRUE, force = TRUE))
store_upload_object_aws(store, file)
}

store_upload_object_aws <- function(store) {
key <- store_aws_key(store$file$path)
bucket <- store_aws_bucket(store$file$path)
store_upload_object_aws <- function(store, file) {
key <- store_aws_key(file$path)
bucket <- store_aws_bucket(file$path)
aws <- store$resources$aws
head <- if_any(
file_exists_stage(store$file),
file_exists_stage(file),
aws_s3_upload(
file = store$file$stage,
file = file$stage,
key = key,
bucket = bucket,
region = store_aws_region(store$file$path),
endpoint = store_aws_endpoint(store$file$path),
region = store_aws_region(file$path),
endpoint = store_aws_endpoint(file$path),
part_size = aws$part_size,
args = aws$args,
max_tries = aws$max_tries,
Expand All @@ -245,33 +245,33 @@ store_upload_object_aws <- function(store) {
),
tar_throw_file(
"Cannot upload non-existent AWS staging file ",
store$file$stage,
file$stage,
" to key ",
key,
". The target probably encountered an error."
)
)
path <- grep(
pattern = "^version=",
x = store$file$path,
x = file$path,
value = TRUE,
invert = TRUE
)
store$file$path <- c(path, paste0("version=", head$VersionId))
store$file$hash <- hash_object(head$ETag)
file$path <- c(path, paste0("version=", head$VersionId))
file$hash <- hash_object(head$ETag)
invisible()
}

#' @export
store_has_correct_hash.tar_aws <- function(store) {
hash <- store_aws_hash(store)
!is.null(hash) && identical(hash, store$file$hash)
store_has_correct_hash.tar_aws <- function(store, file) {
hash <- store_aws_hash(store, file)
!is.null(hash) && identical(hash, file$hash)
}

store_aws_hash <- function(store) {
store_aws_hash <- function(store, file) {
tar_runtime$inventories$aws <- tar_runtime$inventories$aws %|||%
inventory_aws_init()
tar_runtime$inventories$aws$get_cache(store = store)
tar_runtime$inventories$aws$get_cache(store = store, file = file)
}
# nocov end

Expand Down
22 changes: 11 additions & 11 deletions R/class_aws_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ store_assert_format_setting.aws_file <- function(format) {
}

#' @export
store_upload_object.tar_aws_file <- function(store) {
store_upload_object_aws(store)
store_upload_object.tar_aws_file <- function(store, file) {
store_upload_object_aws(store, file)
}

#' @export
store_hash_early.tar_aws_file <- function(store) { # nolint
old <- store$file$path
store$file$path <- store_aws_file_stage(store$file$path)
on.exit(store$file$path <- old)
tar_assert_path(store$file$path)
file_update_info(store$file)
store_hash_early.tar_aws_file <- function(store, file) { # nolint
old <- file$path
file$path <- store_aws_file_stage(file$path)
on.exit(file$path <- old)
tar_assert_path(file$path)
file_update_info(file)
}

#' @export
store_hash_late.tar_aws_file <- function(store) { # nolint
store_hash_late.tar_aws_file <- function(store, file) { # nolint
}

#' @export
store_read_object.tar_aws_file <- function(store) {
path <- store$file$path
store_read_object.tar_aws_file <- function(store, file) {
path <- file$path
key <- store_aws_key(path)
bucket <- store_aws_bucket(path)
scratch <- path_scratch_temp_network(pattern = basename(store_aws_key(path)))
Expand Down
6 changes: 3 additions & 3 deletions R/class_branch.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ branch_init <- function(
deps_parent = character(0L),
deps_child = character(0L),
settings = NULL,
cue = NULL
cue = NULL,
store = NULL
) {
deps <- setdiff(unique(c(deps_parent, deps_child)), settings$dimensions)
store <- settings_produce_store(settings)
branch_new(
name = name,
command = command,
Expand Down Expand Up @@ -51,7 +51,7 @@ target_get_type.tar_branch <- function(target) {

#' @export
target_produce_record.tar_branch <- function(target, pipeline, meta) {
file <- target$store$file
file <- target$file
record_init(
name = target_get_name(target),
parent = target_get_parent(target),
Expand Down
42 changes: 30 additions & 12 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ target_bootstrap.tar_builder <- function(
) {
record <- target_bootstrap_record(target, meta)
target$store <- record_bootstrap_store(record)
target$file <- record_bootstrap_file(record)
invisible()
}

#' @export
target_read_value.tar_builder <- function(target, pipeline = NULL) {
command <- target$command
load_packages(packages = command$packages, library = command$library)
object <- store_read_object(target$store)
object <- store_read_object(target$store, target$file)
iteration <- target$settings$iteration
value_init(object, iteration)
}
Expand Down Expand Up @@ -140,7 +141,12 @@ target_run.tar_builder <- function(target, envir, path_store) {
builder_ensure_deps(target, target$subpipeline, "worker")
frames <- frames_produce(envir, target, target$subpipeline)
builder_set_tar_runtime(target, frames)
store_update_stage_early(target$store, target_get_name(target), path_store)
store_update_stage_early(
store = target$store,
file = target$file,
name = target_get_name(target),
path_store = path_store
)
builder_update_build(target, frames_get_envir(frames))
builder_ensure_paths(target, path_store)
builder_ensure_object(target, "worker")
Expand Down Expand Up @@ -184,7 +190,7 @@ target_skip.tar_builder <- function(
active
) {
target_update_queue(target, scheduler)
file_repopulate(target$store$file, meta$get_record(target_get_name(target)))
file_repopulate(target$file, meta$get_record(target_get_name(target)))
if (active) {
builder_ensure_workspace(
target = target,
Expand Down Expand Up @@ -230,7 +236,7 @@ target_conclude.tar_builder <- function(target, pipeline, scheduler, meta) {
}

builder_completed <- function(target, pipeline, scheduler, meta) {
store_cache_path(target$store, target$store$file$path)
store_cache_path(target$store, target$file$path)
target_ensure_buds(target, pipeline, scheduler)
meta$insert_record(target_produce_record(target, pipeline, meta))
target_patternview_meta(target, pipeline, meta)
Expand Down Expand Up @@ -459,9 +465,21 @@ builder_ensure_paths <- function(target, path_store) {

builder_update_paths <- function(target, path_store) {
name <- target_get_name(target)
store_update_path(target$store, name, target$value$object, path_store)
store_update_stage_late(target$store, name, target$value$object, path_store)
store_hash_early(target$store)
store_update_path(
store = target$store,
file = target$file,
name = name,
object = target$value$object,
path_store = path_store
)
store_update_stage_late(
store = target$store,
file = target$file,
name = name,
object = target$value$object,
path_store = path_store
)
store_hash_early(target$store, target$file)
}

builder_unload_value <- function(target) {
Expand All @@ -475,10 +493,10 @@ builder_unload_value <- function(target) {

builder_update_object <- function(target) {
on.exit(builder_unload_value(target))
file_validate_path(target$store$file$path)
file_validate_path(target$file$path)
if (!identical(target$settings$storage, "none")) {
withCallingHandlers(
store_write_object(target$store, target$value$object),
store_write_object(target$store, target$file, target$value$object),
warning = function(condition) {
if (length(target$metrics$warnings) < 51L) {
target$metrics$warnings <- paste(
Expand All @@ -491,8 +509,8 @@ builder_update_object <- function(target) {
}
)
}
store_hash_late(target$store)
store_upload_object(target$store)
store_hash_late(target$store, target$file)
store_upload_object(target$store, target$file)
}

builder_expect_storage <- function(target) {
Expand Down Expand Up @@ -532,7 +550,7 @@ builder_ensure_correct_hash <- function(target) {
builder_wait_correct_hash <- function(target) {
storage <- target$settings$storage
deployment <- target$settings$deployment
store_ensure_correct_hash(target$store, storage, deployment)
store_ensure_correct_hash(target$store, target$file, storage, deployment)
}

builder_set_tar_runtime <- function(target, frames) {
Expand Down
23 changes: 14 additions & 9 deletions R/class_cloud.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# nocov start
#' @export
store_tar_path.tar_cloud <- function(store, target, path_store) {
store$file$stage
target$file$stage
}

#' @export
Expand All @@ -14,8 +14,8 @@ store_produce_stage.tar_cloud <- function(store, name, object, path_store) {
}

#' @export
store_write_object.tar_cloud <- function(store, object) {
stage <- store$file$stage
store_write_object.tar_cloud <- function(store, file, object) {
stage <- file$stage
dir_create(dirname(stage))
store_write_path(store, store_convert_object(store, object), stage)
}
Expand All @@ -25,16 +25,21 @@ store_cache_path.tar_cloud <- function(store, path) {
}

#' @export
store_hash_late.tar_cloud <- function(store) {
tar_assert_path(store$file$stage)
file <- file_init(path = store$file$stage)
store_hash_late.tar_cloud <- function(store, file) {
tar_assert_path(file$stage)
file <- file_init(path = file$stage)
file_update_info(file)
store$file$bytes <- file$bytes
store$file$time <- file$time
file$bytes <- file$bytes
file$time <- file$time
}

#' @export
store_ensure_correct_hash.tar_cloud <- function(store, storage, deployment) {
store_ensure_correct_hash.tar_cloud <- function(
store,
file,
storage,
deployment
) {
}

#' @export
Expand Down
8 changes: 4 additions & 4 deletions R/class_cue.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ cue_file <- function(cue, target, meta, record) {
if (!cue$file) {
return(FALSE)
}
file_current <- target$store$file
file_current <- target$file
file_recorded <- file_new(
path = record$path,
hash = record$data,
time = record$time,
size = record$size,
bytes = record$bytes
)
on.exit(target$store$file <- file_current)
target$store$file <- file_recorded
!store_has_correct_hash(target$store)
on.exit(target$file <- file_current)
target$file <- file_recorded
!store_has_correct_hash(target$store, target$file)
}

cue_seed <- function(cue, target, meta, record) {
Expand Down
4 changes: 2 additions & 2 deletions R/class_external.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#' @export
store_row_path.tar_external <- function(store) {
store$file$path
store_row_path.tar_external <- function(store, file) {
file$path
}

#' @export
Expand Down
Loading

0 comments on commit 12c4f7c

Please sign in to comment.