From 0b2e9293cec391bc16f08940d863c411fb000742 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Fri, 1 Nov 2024 13:25:06 -0300 Subject: [PATCH 1/2] Rust Icechunk learns how to collect garbage We call garbage collection to the process where dangling objects are deleted from object store. We define a dangling object as a chunk, manifest, snapshot, attributes or transaction log file that cannot be reached by navigating the parent relationship starting from all possible refs. There are currently two mechanisms that create dangling objects: - Abandoning a session without committing it - Resetting a branch leaving snapshots behind In the future, we'll introduce more mechanisms that "generate" garbage, with the objective of reducing storage costs. One example, would be squashing commits when version resolution is not relevant. Garbage collection is an inherently dangerous process. It's the only time at which Icechunk actually deletes data from object store. As such, it must be executed carefully. There is an unavoidable race condition in garbage collection: Icechunk has no way to distinguish a new object from a dangling one, if that object was created after the garbage collection process has traced the refs. To solve that issue, the garbage collection process only deletes objects that have been created some time ago. Users can pass a timestamp as configuration to the collection process. This timestamp must be older than the start time of the oldest possible writing session open. For example, if the longest writing sessions last 48 hours, a safe timestamp would be `now - 7 days`. --- Cargo.lock | 13 ++ icechunk/Cargo.toml | 1 + icechunk/src/lib.rs | 1 + icechunk/src/ops/gc.rs | 296 +++++++++++++++++++++++++++ icechunk/src/ops/mod.rs | 1 + icechunk/src/refs.rs | 10 + icechunk/src/repository.rs | 4 +- icechunk/src/storage/caching.rs | 38 +++- icechunk/src/storage/logging.rs | 38 +++- icechunk/src/storage/mod.rs | 32 ++- icechunk/src/storage/object_store.rs | 117 ++++++++++- icechunk/src/storage/s3.rs | 152 +++++++++++++- icechunk/tests/test_gc.rs | 132 ++++++++++++ 13 files changed, 820 insertions(+), 15 deletions(-) create mode 100644 icechunk/src/ops/gc.rs create mode 100644 icechunk/src/ops/mod.rs create mode 100644 icechunk/tests/test_gc.rs diff --git a/Cargo.lock b/Cargo.lock index 62b1d714..20c5105e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -453,6 +453,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "aws-smithy-types-convert" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f280f434214856abace637b1f944d50ccca216814813acd195cdd7f206ce17f" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "chrono", + "futures-core", +] + [[package]] name = "aws-smithy-xml" version = "0.60.9" @@ -1193,6 +1205,7 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-sdk-s3", + "aws-smithy-types-convert", "base32", "base64 0.22.1", "bytes", diff --git a/icechunk/Cargo.toml b/icechunk/Cargo.toml index f99e0dca..57e285d0 100644 --- a/icechunk/Cargo.toml +++ b/icechunk/Cargo.toml @@ -39,6 +39,7 @@ aws-sdk-s3 = "1.53.0" aws-config = "1.5.7" aws-credential-types = "1.2.1" typed-path = "0.9.2" +aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono", "convert-streams"] } [dev-dependencies] pretty_assertions = "1.4.1" diff --git a/icechunk/src/lib.rs b/icechunk/src/lib.rs index e46b48cb..f32251af 100644 --- a/icechunk/src/lib.rs +++ b/icechunk/src/lib.rs @@ -20,6 +20,7 @@ pub mod change_set; pub mod format; pub mod metadata; +pub mod ops; pub mod refs; pub mod repository; pub mod storage; diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs new file mode 100644 index 00000000..46ffa078 --- /dev/null +++ b/icechunk/src/ops/gc.rs @@ -0,0 +1,296 @@ +use std::{collections::HashSet, future::ready, iter}; + +use chrono::{DateTime, Utc}; +use futures::{stream, Stream, StreamExt, TryStreamExt}; +use tokio::pin; + +use crate::{ + format::{ChunkId, ManifestId, SnapshotId}, + refs::{list_refs, RefError}, + repository::ChunkPayload, + storage::ListInfo, + Storage, StorageError, +}; + +#[derive(Debug, PartialEq, Eq)] +pub enum Action { + Keep, + DeleteIfCreatedBefore(DateTime), +} + +#[derive(Debug)] +pub struct GCConfig { + extra_roots: HashSet, + dangling_chunks: Action, + dangling_manifests: Action, + dangling_attributes: Action, + dangling_transaction_logs: Action, + dangling_snapshots: Action, +} + +impl GCConfig { + pub fn new( + extra_roots: HashSet, + dangling_chunks: Action, + dangling_manifests: Action, + dangling_attributes: Action, + dangling_transaction_logs: Action, + dangling_snapshots: Action, + ) -> Self { + GCConfig { + extra_roots, + dangling_chunks, + dangling_manifests, + dangling_attributes, + dangling_transaction_logs, + dangling_snapshots, + } + } + pub fn clean_all( + chunks_age: DateTime, + metadata_age: DateTime, + extra_roots: Option>, + ) -> Self { + use Action::DeleteIfCreatedBefore as D; + Self::new( + extra_roots.unwrap_or_default(), + D(chunks_age), + D(metadata_age), + D(metadata_age), + D(metadata_age), + D(metadata_age), + ) + } + + fn action_needed(&self) -> bool { + [ + &self.dangling_chunks, + &self.dangling_manifests, + &self.dangling_attributes, + &self.dangling_transaction_logs, + &self.dangling_snapshots, + ] + .into_iter() + .any(|action| action != &Action::Keep) + } + + pub fn deletes_chunks(&self) -> bool { + self.dangling_chunks != Action::Keep + } + + pub fn deletes_manifests(&self) -> bool { + self.dangling_manifests != Action::Keep + } + + pub fn deletes_attributes(&self) -> bool { + self.dangling_attributes != Action::Keep + } + + pub fn deletes_transaction_logs(&self) -> bool { + self.dangling_transaction_logs != Action::Keep + } + + pub fn deletes_snapshots(&self) -> bool { + self.dangling_snapshots != Action::Keep + } + + fn must_delete_chunk(&self, chunk: &ListInfo) -> bool { + match self.dangling_chunks { + Action::DeleteIfCreatedBefore(before) => chunk.created_at < before, + _ => false, + } + } + + fn must_delete_manifest(&self, manifest: &ListInfo) -> bool { + match self.dangling_manifests { + Action::DeleteIfCreatedBefore(before) => manifest.created_at < before, + _ => false, + } + } + + fn must_delete_snapshot(&self, snapshot: &ListInfo) -> bool { + match self.dangling_snapshots { + Action::DeleteIfCreatedBefore(before) => snapshot.created_at < before, + _ => false, + } + } +} + +#[derive(Debug, PartialEq, Eq, Default)] +pub struct GCSummary { + pub chunks_deleted: usize, + pub manifests_deleted: usize, + pub snapshots_deleted: usize, + pub attributes_deleted: usize, + pub transaction_logs_deleted: usize, +} + +#[derive(Debug, thiserror::Error)] +pub enum GCError { + #[error("ref error {0}")] + Ref(#[from] RefError), + #[error("storage error {0}")] + Storage(#[from] StorageError), +} + +pub type GCResult = Result; + +pub async fn garbage_collect( + storage: &(dyn Storage + Send + Sync), + config: &GCConfig, +) -> GCResult { + // TODO: this function could have much more parallelism + if !config.action_needed() { + return Ok(GCSummary::default()); + } + + let all_snaps = pointed_snapshots(storage, &config.extra_roots).await?; + + // FIXME: add attribute files + // FIXME: add transaction log files + let mut keep_chunks = HashSet::new(); + let mut keep_manifests = HashSet::new(); + let mut keep_snapshots = HashSet::new(); + + pin!(all_snaps); + while let Some(snap_id) = all_snaps.try_next().await? { + let snap = storage.fetch_snapshot(&snap_id).await?; + if config.deletes_snapshots() { + keep_snapshots.insert(snap_id); + } + + if config.deletes_manifests() { + keep_manifests.extend(snap.manifest_files.iter().map(|mf| mf.id.clone())); + } + + if config.deletes_chunks() { + for manifest_file in snap.manifest_files.iter() { + let manifest_id = &manifest_file.id; + let manifest = storage.fetch_manifests(manifest_id).await?; + let chunk_ids = + manifest.chunks().values().filter_map(|payload| match payload { + ChunkPayload::Ref(chunk_ref) => Some(chunk_ref.id.clone()), + _ => None, + }); + keep_chunks.extend(chunk_ids); + } + } + } + + let mut summary = GCSummary::default(); + + if config.deletes_snapshots() { + summary.snapshots_deleted = gc_snapshots(storage, config, keep_snapshots).await?; + } + if config.deletes_manifests() { + summary.manifests_deleted = gc_manifests(storage, config, keep_manifests).await?; + } + if config.deletes_chunks() { + summary.chunks_deleted = gc_chunks(storage, config, keep_chunks).await?; + } + + Ok(summary) +} + +async fn all_roots<'a>( + storage: &'a (dyn Storage + Send + Sync), + extra_roots: &'a HashSet, +) -> GCResult> + 'a> { + let all_refs = list_refs(storage).await?; + // TODO: this could be optimized by not following the ancestry of snapshots that we have + // already seen + let roots = + stream::iter(all_refs) + .then(move |r| async move { + r.fetch(storage).await.map(|ref_data| ref_data.snapshot) + }) + .err_into() + .chain(stream::iter(extra_roots.iter().cloned()).map(Ok)); + Ok(roots) +} + +async fn pointed_snapshots<'a>( + storage: &'a (dyn Storage + Send + Sync), + extra_roots: &'a HashSet, +) -> GCResult> + 'a> { + let roots = all_roots(storage, extra_roots).await?; + Ok(roots + .and_then(move |snap_id| async move { + let snap = storage.fetch_snapshot(&snap_id).await?; + // FIXME: this should be global ancestry, not local + let parents = snap.local_ancestry().map(|parent| parent.id); + Ok(stream::iter(iter::once(snap_id).chain(parents)) + .map(Ok::)) + }) + .try_flatten()) +} + +async fn gc_chunks( + storage: &(dyn Storage + Send + Sync), + config: &GCConfig, + keep_ids: HashSet, +) -> GCResult { + let to_delete = storage + .list_chunks() + .await? + // TODO: don't skip over errors + .filter_map(move |chunk| { + ready(chunk.ok().and_then(|chunk| { + if config.must_delete_chunk(&chunk) && !keep_ids.contains(&chunk.id) { + Some(chunk.id.clone()) + } else { + None + } + })) + }) + .boxed(); + Ok(storage.delete_chunks(to_delete).await?) +} + +async fn gc_manifests( + storage: &(dyn Storage + Send + Sync), + config: &GCConfig, + keep_ids: HashSet, +) -> GCResult { + let to_delete = storage + .list_manifests() + .await? + // TODO: don't skip over errors + .filter_map(move |manifest| { + ready(manifest.ok().and_then(|manifest| { + if config.must_delete_manifest(&manifest) + && !keep_ids.contains(&manifest.id) + { + Some(manifest.id.clone()) + } else { + None + } + })) + }) + .boxed(); + Ok(storage.delete_manifests(to_delete).await?) +} + +async fn gc_snapshots( + storage: &(dyn Storage + Send + Sync), + config: &GCConfig, + keep_ids: HashSet, +) -> GCResult { + let to_delete = storage + .list_snapshots() + .await? + // TODO: don't skip over errors + .filter_map(move |snapshot| { + ready(snapshot.ok().and_then(|snapshot| { + if config.must_delete_snapshot(&snapshot) + && !keep_ids.contains(&snapshot.id) + { + Some(snapshot.id.clone()) + } else { + None + } + })) + }) + .boxed(); + Ok(storage.delete_snapshots(to_delete).await?) +} diff --git a/icechunk/src/ops/mod.rs b/icechunk/src/ops/mod.rs new file mode 100644 index 00000000..718bc653 --- /dev/null +++ b/icechunk/src/ops/mod.rs @@ -0,0 +1 @@ +pub mod gc; diff --git a/icechunk/src/refs.rs b/icechunk/src/refs.rs index bb52912a..464ee8ae 100644 --- a/icechunk/src/refs.rs +++ b/icechunk/src/refs.rs @@ -66,6 +66,16 @@ impl Ref { }, } } + + pub async fn fetch( + &self, + storage: &(dyn Storage + Send + Sync), + ) -> RefResult { + match self { + Ref::Tag(name) => fetch_tag(storage, name).await, + Ref::Branch(name) => fetch_branch_tip(storage, name).await, + } + } } #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 8bd2795a..b94f3f86 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -214,7 +214,7 @@ impl Repository { return Err(RepositoryError::AlreadyInitialized); } let new_snapshot = Snapshot::empty(); - let new_snapshot_id = ObjectId::random(); + let new_snapshot_id = new_snapshot.metadata.id.clone(); storage.write_snapshot(new_snapshot_id.clone(), Arc::new(new_snapshot)).await?; update_branch( storage.as_ref(), @@ -314,7 +314,7 @@ impl Repository { let parent = self.storage.fetch_snapshot(self.snapshot_id()).await?; let last = parent.metadata.clone(); let it = if parent.short_term_history.len() < parent.total_parents as usize { - // TODO: implement splitting of snapshot history + // FIXME: implement splitting of snapshot history Either::Left(parent.local_ancestry().chain(iter::once_with(|| todo!()))) } else { Either::Right(parent.local_ancestry()) diff --git a/icechunk/src/storage/caching.rs b/icechunk/src/storage/caching.rs index 998347c0..54f25d84 100644 --- a/icechunk/src/storage/caching.rs +++ b/icechunk/src/storage/caching.rs @@ -13,7 +13,7 @@ use crate::{ private, }; -use super::{Storage, StorageError, StorageResult}; +use super::{ListInfo, Storage, StorageError, StorageResult}; #[derive(Debug)] pub struct MemCachingStorage { @@ -163,6 +163,42 @@ impl Storage for MemCachingStorage { ) -> StorageResult>> { self.backend.ref_versions(ref_name).await } + + async fn list_chunks( + &self, + ) -> StorageResult>>> { + self.backend.list_chunks().await + } + + async fn list_manifests( + &self, + ) -> StorageResult>>> { + self.backend.list_manifests().await + } + + async fn list_snapshots( + &self, + ) -> StorageResult>>> { + self.backend.list_snapshots().await + } + + async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult { + self.backend.delete_chunks(ids).await + } + + async fn delete_manifests( + &self, + ids: BoxStream<'_, ManifestId>, + ) -> StorageResult { + self.backend.delete_manifests(ids).await + } + + async fn delete_snapshots( + &self, + ids: BoxStream<'_, SnapshotId>, + ) -> StorageResult { + self.backend.delete_snapshots(ids).await + } } #[cfg(test)] diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 904010ad..0b656321 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; -use super::{Storage, StorageError, StorageResult}; +use super::{ListInfo, Storage, StorageError, StorageResult}; use crate::{ format::{ attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, @@ -132,4 +132,40 @@ impl Storage for LoggingStorage { ) -> StorageResult>> { self.backend.ref_versions(ref_name).await } + + async fn list_chunks( + &self, + ) -> StorageResult>>> { + self.backend.list_chunks().await + } + + async fn list_manifests( + &self, + ) -> StorageResult>>> { + self.backend.list_manifests().await + } + + async fn list_snapshots( + &self, + ) -> StorageResult>>> { + self.backend.list_snapshots().await + } + + async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult { + self.backend.delete_chunks(ids).await + } + + async fn delete_manifests( + &self, + ids: BoxStream<'_, ManifestId>, + ) -> StorageResult { + self.backend.delete_manifests(ids).await + } + + async fn delete_snapshots( + &self, + ids: BoxStream<'_, SnapshotId>, + ) -> StorageResult { + self.backend.delete_snapshots(ids).await + } } diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index c63f5351..fced7e69 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -2,11 +2,12 @@ use aws_sdk_s3::{ config::http::HttpResponse, error::SdkError, operation::{ - get_object::GetObjectError, list_objects_v2::ListObjectsV2Error, - put_object::PutObjectError, + delete_objects::DeleteObjectsError, get_object::GetObjectError, + list_objects_v2::ListObjectsV2Error, put_object::PutObjectError, }, primitives::ByteStreamError, }; +use chrono::{DateTime, Utc}; use core::fmt; use futures::stream::BoxStream; use std::{ffi::OsString, sync::Arc}; @@ -47,6 +48,8 @@ pub enum StorageError { S3PutObjectError(#[from] SdkError), #[error("error listing objects in object store {0}")] S3ListObjectError(#[from] SdkError), + #[error("error deleting objects in object store {0}")] + S3DeleteObjectError(#[from] SdkError), #[error("error streaming bytes from object store {0}")] S3StreamError(#[from] ByteStreamError), #[error("messagepack decode error: {0}")] @@ -63,6 +66,11 @@ pub enum StorageError { pub type StorageResult = Result; +pub struct ListInfo { + pub id: Id, + pub created_at: DateTime, +} + /// Fetch and write the parquet files that represent the repository in object store /// /// Different implementation can cache the files differently, or not at all. @@ -106,4 +114,24 @@ pub trait Storage: fmt::Debug + private::Sealed { overwrite_refs: bool, bytes: Bytes, ) -> StorageResult<()>; + + async fn list_chunks( + &self, + ) -> StorageResult>>>; + async fn list_manifests( + &self, + ) -> StorageResult>>>; + async fn list_snapshots( + &self, + ) -> StorageResult>>>; + + async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult; + async fn delete_manifests( + &self, + ids: BoxStream<'_, ManifestId>, + ) -> StorageResult; + async fn delete_snapshots( + &self, + ids: BoxStream<'_, SnapshotId>, + ) -> StorageResult; } diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index ee929391..2ebbc5a2 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -8,17 +8,27 @@ use crate::{ }; use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use futures::{ + stream::{self, BoxStream}, + StreamExt, TryStreamExt, +}; use object_store::{ local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, Attribute, - AttributeValue, Attributes, GetOptions, GetRange, ObjectStore, PutMode, PutOptions, - PutPayload, + AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode, + PutOptions, PutPayload, }; use std::{ - fs::create_dir_all, future::ready, ops::Range, path::Path as StdPath, sync::Arc, + fs::create_dir_all, + future::ready, + ops::Range, + path::Path as StdPath, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; -use super::{Storage, StorageError, StorageResult}; +use super::{ListInfo, Storage, StorageError, StorageResult}; // Get Range is object_store specific, keep it with this module impl From<&ByteRange> for Option { @@ -145,6 +155,54 @@ impl ObjectStorage { }) .boxed() } + + fn list_objects<'a, Id>( + &'a self, + prefix: &str, + ) -> StorageResult>>> + where + Id: for<'b> TryFrom<&'b str> + Send + 'a, + { + let prefix = ObjectPath::from(format!("{}/{}", self.prefix.as_str(), prefix)); + let stream = self + .store + .list(Some(&prefix)) + // TODO: we should signal error instead of filtering + .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))) + .err_into(); + Ok(stream.boxed()) + } + + async fn delete_batch( + &self, + prefix: &str, + batch: Vec>, + ) -> StorageResult { + let keys = batch.iter().map(|id| Ok(self.get_path(prefix, id))); + let results = self.store.delete_stream(stream::iter(keys).boxed()); + // FIXME: flag errors instead of skipping them + Ok(results.filter(|res| ready(res.is_ok())).count().await) + } + + async fn delete_objects( + &self, + prefix: &str, + ids: BoxStream<'_, ObjectId>, + ) -> StorageResult { + let deleted = AtomicUsize::new(0); + ids.chunks(1_000) + // FIXME: configurable concurrency + .for_each_concurrent(10, |batch| { + let deleted = &deleted; + async move { + // FIXME: handle error instead of skipping + let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); + deleted.fetch_add(new_deletes, Ordering::Release); + } + }) + .await; + Ok(deleted.into_inner()) + } } impl private::Sealed for ObjectStorage {} @@ -354,4 +412,53 @@ impl Storage for ObjectStorage { }) .map(|_| ()) } + + async fn list_chunks( + &self, + ) -> StorageResult>>> { + self.list_objects(CHUNK_PREFIX) + } + + async fn list_manifests( + &self, + ) -> StorageResult>>> { + self.list_objects(MANIFEST_PREFIX) + } + + async fn list_snapshots( + &self, + ) -> StorageResult>>> { + self.list_objects(SNAPSHOT_PREFIX) + } + + async fn delete_chunks( + &self, + chunks: BoxStream<'_, ChunkId>, + ) -> StorageResult { + self.delete_objects(CHUNK_PREFIX, chunks).await + } + + async fn delete_manifests( + &self, + chunks: BoxStream<'_, ManifestId>, + ) -> StorageResult { + self.delete_objects(MANIFEST_PREFIX, chunks).await + } + + async fn delete_snapshots( + &self, + chunks: BoxStream<'_, SnapshotId>, + ) -> StorageResult { + self.delete_objects(SNAPSHOT_PREFIX, chunks).await + } +} + +fn object_to_list_info<'a, Id>(object: &ObjectMeta) -> Option> +where + Id: for<'b> TryFrom<&'b str> + Send + 'a, +{ + let created_at = object.last_modified; + let id_str = object.location.filename()?; + let id = Id::try_from(id_str).ok()?; + Some(ListInfo { id, created_at }) } diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index 543cc00e..d71696cd 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -1,4 +1,12 @@ -use std::{ops::Range, path::PathBuf, sync::Arc}; +use std::{ + future::ready, + ops::Range, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use async_stream::try_stream; use async_trait::async_trait; @@ -8,10 +16,15 @@ use aws_sdk_s3::{ config::{Builder, Region}, error::ProvideErrorMetadata, primitives::ByteStream, + types::{Delete, Object, ObjectIdentifier}, Client, }; +use aws_smithy_types_convert::{date_time::DateTimeExt, stream::PaginationStreamExt}; use bytes::Bytes; -use futures::StreamExt; +use futures::{ + stream::{self, BoxStream}, + StreamExt, TryStreamExt, +}; use serde::{Deserialize, Serialize}; use crate::{ @@ -25,7 +38,7 @@ use crate::{ Storage, StorageError, }; -use super::StorageResult; +use super::{ListInfo, StorageResult}; #[derive(Debug)] pub struct S3Storage { @@ -203,6 +216,86 @@ impl S3Storage { b.body(bytes.into()).send().await?; Ok(()) } + + fn list_objects<'a, Id>( + &'a self, + prefix: &str, + ) -> StorageResult>>> + where + Id: for<'b> TryFrom<&'b str> + Send + 'a, + { + let prefix = PathBuf::from_iter([self.prefix.as_str(), prefix]) + .into_os_string() + .into_string() + .map_err(StorageError::BadPrefix)?; + let stream = self + .client + .list_objects_v2() + .bucket(self.bucket.clone()) + .prefix(prefix) + .into_paginator() + .send() + .into_stream_03x() + .try_filter_map(|page| { + let contents = page.contents.map(|cont| stream::iter(cont).map(Ok)); + ready(Ok(contents)) + }) + .try_flatten() + // TODO: we should signal error instead of filtering + .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))); + Ok(stream.boxed()) + } + + async fn delete_batch( + &self, + prefix: &str, + batch: Vec>, + ) -> StorageResult { + let keys = batch + .iter() + // FIXME: flag errors instead of skipping them + .filter_map(|id| { + let key = self.get_path(prefix, id).ok()?; + let ident = ObjectIdentifier::builder().key(key).build().ok()?; + Some(ident) + }) + .collect(); + + let delete = Delete::builder() + .set_objects(Some(keys)) + .build() + .map_err(|e| StorageError::Other(e.to_string()))?; + + let res = self + .client + .delete_objects() + .bucket(self.bucket.clone()) + .delete(delete) + .send() + .await?; + + Ok(res.deleted().len()) + } + + async fn delete_objects( + &self, + prefix: &str, + ids: BoxStream<'_, ObjectId>, + ) -> StorageResult { + let deleted = AtomicUsize::new(0); + ids.chunks(1_000) + // FIXME: configurable concurrency + .for_each_concurrent(10, |batch| { + let deleted = &deleted; + async move { + // FIXME: handle error instead of skipping + let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); + deleted.fetch_add(new_deletes, Ordering::Release); + } + }) + .await; + Ok(deleted.into_inner()) + } } pub fn range_to_header(range: &ByteRange) -> Option { @@ -364,7 +457,7 @@ impl Storage for S3Storage { async fn ref_versions( &self, ref_name: &str, - ) -> StorageResult>> { + ) -> StorageResult>> { let prefix = self.ref_key(ref_name)?; let mut paginator = self .client @@ -417,4 +510,55 @@ impl Storage for S3Storage { } } } + + async fn list_chunks( + &self, + ) -> StorageResult>>> { + self.list_objects(CHUNK_PREFIX) + } + + async fn list_manifests( + &self, + ) -> StorageResult>>> { + self.list_objects(MANIFEST_PREFIX) + } + + async fn list_snapshots( + &self, + ) -> StorageResult>>> { + self.list_objects(SNAPSHOT_PREFIX) + } + + async fn delete_chunks( + &self, + chunks: BoxStream<'_, ChunkId>, + ) -> StorageResult { + self.delete_objects(CHUNK_PREFIX, chunks).await + } + + async fn delete_manifests( + &self, + chunks: BoxStream<'_, ManifestId>, + ) -> StorageResult { + self.delete_objects(MANIFEST_PREFIX, chunks).await + } + + async fn delete_snapshots( + &self, + chunks: BoxStream<'_, SnapshotId>, + ) -> StorageResult { + self.delete_objects(SNAPSHOT_PREFIX, chunks).await + } +} + +fn object_to_list_info<'a, Id>(object: &Object) -> Option> +where + Id: for<'b> TryFrom<&'b str> + Send + 'a, +{ + let key = object.key()?; + let last_modified = object.last_modified()?; + let created_at = last_modified.to_chrono_utc().ok()?; + let id_str = Path::new(key).file_name().and_then(|s| s.to_str())?; + let id = Id::try_from(id_str).ok()?; + Some(ListInfo { id, created_at }) } diff --git a/icechunk/tests/test_gc.rs b/icechunk/tests/test_gc.rs new file mode 100644 index 00000000..c29887e1 --- /dev/null +++ b/icechunk/tests/test_gc.rs @@ -0,0 +1,132 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::{num::NonZeroU64, sync::Arc}; + +use bytes::Bytes; +use chrono::Utc; +use futures::StreamExt; +use icechunk::{ + format::{ByteRange, ChunkId, ChunkIndices, Path}, + metadata::{ChunkKeyEncoding, ChunkShape, DataType, FillValue}, + ops::gc::{garbage_collect, GCConfig, GCSummary}, + refs::update_branch, + repository::{get_chunk, ZarrArrayMetadata}, + storage::s3::{S3Config, S3Credentials, S3Storage, StaticS3Credentials}, + Repository, Storage, +}; +use pretty_assertions::assert_eq; + +fn minio_s3_config() -> S3Config { + S3Config { + region: Some("us-east-1".to_string()), + endpoint: Some("http://localhost:9000".to_string()), + credentials: S3Credentials::Static(StaticS3Credentials { + access_key_id: "minio123".into(), + secret_access_key: "minio123".into(), + session_token: None, + }), + allow_http: true, + } +} + +#[tokio::test] +/// Create a repo with two commits, reset the branch to "forget" the last commit, run gc +/// +/// It runs [`garbage_collect`] to verify it's doing its job. +pub async fn test_gc() -> Result<(), Box> { + let storage: Arc = Arc::new( + S3Storage::new_s3_store( + "testbucket".to_string(), + format!("{:?}", ChunkId::random()), + Some(&minio_s3_config()), + ) + .await + .expect("Creating minio storage failed"), + ); + let mut repo = Repository::init(Arc::clone(&storage), false) + .await? + .with_inline_threshold_bytes(0) + .build(); + + repo.add_group(Path::root()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![1100], + data_type: DataType::Int8, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).expect("Cannot create NonZero")]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int8(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + + let array_path: Path = "/array".try_into().unwrap(); + repo.add_array(array_path.clone(), zarr_meta.clone()).await?; + // we write more than 1k chunks to go beyond the chunk size for object listing and delete + for idx in 0..1100 { + let bytes = Bytes::copy_from_slice(&42i8.to_be_bytes()); + let payload = repo.get_chunk_writer()(bytes.clone()).await?; + repo.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + + let first_snap_id = repo.commit("main", "first", None).await?; + assert_eq!(storage.list_chunks().await?.count().await, 1100); + + // overwrite 10 chunks + for idx in 0..10 { + let bytes = Bytes::copy_from_slice(&0i8.to_be_bytes()); + let payload = repo.get_chunk_writer()(bytes.clone()).await?; + repo.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + let second_snap_id = repo.commit("main", "second", None).await?; + assert_eq!(storage.list_chunks().await?.count().await, 1110); + + // verify doing gc without dangling objects doesn't change the repo + let now = Utc::now(); + let gc_config = GCConfig::clean_all(now, now, None); + let summary = garbage_collect(storage.as_ref(), &gc_config).await?; + assert_eq!(summary, GCSummary::default()); + assert_eq!(storage.list_chunks().await?.count().await, 1110); + for idx in 0..10 { + let bytes = get_chunk( + repo.get_chunk_reader(&array_path, &ChunkIndices(vec![idx]), &ByteRange::ALL) + .await?, + ) + .await? + .unwrap(); + assert_eq!(&0i8.to_be_bytes(), bytes.as_ref()); + } + + // Reset the branch to leave the latest commit dangling + update_branch(storage.as_ref(), "main", first_snap_id, Some(&second_snap_id), false) + .await?; + + // we still have all the chunks + assert_eq!(storage.list_chunks().await?.count().await, 1110); + + let summary = garbage_collect(storage.as_ref(), &gc_config).await?; + assert_eq!(summary.chunks_deleted, 10); + assert_eq!(summary.manifests_deleted, 1); + assert_eq!(summary.snapshots_deleted, 1); + + // 10 chunks should be drop + assert_eq!(storage.list_chunks().await?.count().await, 1100); + assert_eq!(storage.list_manifests().await?.count().await, 1); + assert_eq!(storage.list_snapshots().await?.count().await, 2); + + // Opening the repo on main should give the right data + let repo = Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + for idx in 0..10 { + let bytes = get_chunk( + repo.get_chunk_reader(&array_path, &ChunkIndices(vec![idx]), &ByteRange::ALL) + .await?, + ) + .await? + .unwrap(); + assert_eq!(&42i8.to_be_bytes(), bytes.as_ref()); + } + + Ok(()) +} From cccd83c6b5f3e5b3020c6fafb3ba72b4e9ae5375 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Mon, 4 Nov 2024 14:13:22 -0300 Subject: [PATCH 2/2] Provide default implementations for storage methods This is a bit less efficient, because we have two allocations per item: we need to go item -> string -> ObjectId, instead of item -> ObjectId. But it's also less code. --- icechunk/src/storage/caching.rs | 39 ++----- icechunk/src/storage/logging.rs | 39 ++----- icechunk/src/storage/mod.rs | 76 ++++++++++++-- icechunk/src/storage/object_store.rs | 132 ++++++++---------------- icechunk/src/storage/s3.rs | 148 +++++++++------------------ 5 files changed, 179 insertions(+), 255 deletions(-) diff --git a/icechunk/src/storage/caching.rs b/icechunk/src/storage/caching.rs index 54f25d84..19a2de28 100644 --- a/icechunk/src/storage/caching.rs +++ b/icechunk/src/storage/caching.rs @@ -164,40 +164,19 @@ impl Storage for MemCachingStorage { self.backend.ref_versions(ref_name).await } - async fn list_chunks( - &self, - ) -> StorageResult>>> { - self.backend.list_chunks().await - } - - async fn list_manifests( - &self, - ) -> StorageResult>>> { - self.backend.list_manifests().await - } - - async fn list_snapshots( - &self, - ) -> StorageResult>>> { - self.backend.list_snapshots().await - } - - async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult { - self.backend.delete_chunks(ids).await - } - - async fn delete_manifests( - &self, - ids: BoxStream<'_, ManifestId>, - ) -> StorageResult { - self.backend.delete_manifests(ids).await + async fn list_objects<'a>( + &'a self, + prefix: &str, + ) -> StorageResult>>> { + self.backend.list_objects(prefix).await } - async fn delete_snapshots( + async fn delete_objects( &self, - ids: BoxStream<'_, SnapshotId>, + prefix: &str, + ids: BoxStream<'_, String>, ) -> StorageResult { - self.backend.delete_snapshots(ids).await + self.backend.delete_objects(prefix, ids).await } } diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 0b656321..87193b10 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -133,39 +133,18 @@ impl Storage for LoggingStorage { self.backend.ref_versions(ref_name).await } - async fn list_chunks( - &self, - ) -> StorageResult>>> { - self.backend.list_chunks().await - } - - async fn list_manifests( - &self, - ) -> StorageResult>>> { - self.backend.list_manifests().await - } - - async fn list_snapshots( - &self, - ) -> StorageResult>>> { - self.backend.list_snapshots().await - } - - async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult { - self.backend.delete_chunks(ids).await - } - - async fn delete_manifests( - &self, - ids: BoxStream<'_, ManifestId>, - ) -> StorageResult { - self.backend.delete_manifests(ids).await + async fn list_objects<'a>( + &'a self, + prefix: &str, + ) -> StorageResult>>> { + self.backend.list_objects(prefix).await } - async fn delete_snapshots( + async fn delete_objects( &self, - ids: BoxStream<'_, SnapshotId>, + prefix: &str, + ids: BoxStream<'_, String>, ) -> StorageResult { - self.backend.delete_snapshots(ids).await + self.backend.delete_objects(prefix, ids).await } } diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index fced7e69..b95dd964 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -9,7 +9,7 @@ use aws_sdk_s3::{ }; use chrono::{DateTime, Utc}; use core::fmt; -use futures::stream::BoxStream; +use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use std::{ffi::OsString, sync::Arc}; use async_trait::async_trait; @@ -71,6 +71,12 @@ pub struct ListInfo { pub created_at: DateTime, } +const SNAPSHOT_PREFIX: &str = "snapshots/"; +const MANIFEST_PREFIX: &str = "manifests/"; +// const ATTRIBUTES_PREFIX: &str = "attributes/"; +const CHUNK_PREFIX: &str = "chunks/"; +const REF_PREFIX: &str = "refs"; + /// Fetch and write the parquet files that represent the repository in object store /// /// Different implementation can cache the files differently, or not at all. @@ -115,23 +121,75 @@ pub trait Storage: fmt::Debug + private::Sealed { bytes: Bytes, ) -> StorageResult<()>; + async fn list_objects<'a>( + &'a self, + prefix: &str, + ) -> StorageResult>>>; + + /// Delete a stream of objects, by their id string representations + async fn delete_objects( + &self, + prefix: &str, + ids: BoxStream<'_, String>, + ) -> StorageResult; + async fn list_chunks( &self, - ) -> StorageResult>>>; + ) -> StorageResult>>> { + Ok(translate_list_infos(self.list_objects(CHUNK_PREFIX).await?)) + } + async fn list_manifests( &self, - ) -> StorageResult>>>; + ) -> StorageResult>>> { + Ok(translate_list_infos(self.list_objects(MANIFEST_PREFIX).await?)) + } + async fn list_snapshots( &self, - ) -> StorageResult>>>; + ) -> StorageResult>>> { + Ok(translate_list_infos(self.list_objects(SNAPSHOT_PREFIX).await?)) + } + + async fn delete_chunks( + &self, + chunks: BoxStream<'_, ChunkId>, + ) -> StorageResult { + self.delete_objects(CHUNK_PREFIX, chunks.map(|id| id.to_string()).boxed()).await + } - async fn delete_chunks(&self, ids: BoxStream<'_, ChunkId>) -> StorageResult; async fn delete_manifests( &self, - ids: BoxStream<'_, ManifestId>, - ) -> StorageResult; + chunks: BoxStream<'_, ManifestId>, + ) -> StorageResult { + self.delete_objects(MANIFEST_PREFIX, chunks.map(|id| id.to_string()).boxed()) + .await + } + async fn delete_snapshots( &self, - ids: BoxStream<'_, SnapshotId>, - ) -> StorageResult; + chunks: BoxStream<'_, SnapshotId>, + ) -> StorageResult { + self.delete_objects(SNAPSHOT_PREFIX, chunks.map(|id| id.to_string()).boxed()) + .await + } +} + +fn convert_list_item(item: ListInfo) -> Option> +where + Id: for<'b> TryFrom<&'b str>, +{ + let id = Id::try_from(item.id.as_str()).ok()?; + let created_at = item.created_at; + Some(ListInfo { created_at, id }) +} + +fn translate_list_infos<'a, Id>( + s: impl Stream>> + Send + 'a, +) -> BoxStream<'a, StorageResult>> +where + Id: for<'b> TryFrom<&'b str> + Send + 'a, +{ + // FIXME: flag error, don't skip + s.try_filter_map(|info| async move { Ok(convert_list_item(info)) }).boxed() } diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index 2ebbc5a2..3e11deac 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -28,7 +28,10 @@ use std::{ }, }; -use super::{ListInfo, Storage, StorageError, StorageResult}; +use super::{ + ListInfo, Storage, StorageError, StorageResult, CHUNK_PREFIX, MANIFEST_PREFIX, + REF_PREFIX, SNAPSHOT_PREFIX, +}; // Get Range is object_store specific, keep it with this module impl From<&ByteRange> for Option { @@ -44,12 +47,6 @@ impl From<&ByteRange> for Option { } } -const SNAPSHOT_PREFIX: &str = "snapshots/"; -const MANIFEST_PREFIX: &str = "manifests/"; -// const ATTRIBUTES_PREFIX: &str = "attributes/"; -const CHUNK_PREFIX: &str = "chunks/"; -const REF_PREFIX: &str = "refs"; - #[derive(Debug)] pub struct ObjectStorage { store: Arc, @@ -107,15 +104,18 @@ impl ObjectStorage { .await?) } + fn get_path_str(&self, file_prefix: &str, id: &str) -> ObjectPath { + let path = format!("{}/{}/{}", self.prefix, file_prefix, id); + ObjectPath::from(path) + } + fn get_path( &self, file_prefix: &str, id: &ObjectId, ) -> ObjectPath { - // TODO: be careful about allocation here // we serialize the url using crockford - let path = format!("{}/{}/{}", self.prefix, file_prefix, id); - ObjectPath::from(path) + self.get_path_str(file_prefix, id.to_string().as_str()) } fn get_snapshot_path(&self, id: &SnapshotId) -> ObjectPath { @@ -156,53 +156,16 @@ impl ObjectStorage { .boxed() } - fn list_objects<'a, Id>( - &'a self, - prefix: &str, - ) -> StorageResult>>> - where - Id: for<'b> TryFrom<&'b str> + Send + 'a, - { - let prefix = ObjectPath::from(format!("{}/{}", self.prefix.as_str(), prefix)); - let stream = self - .store - .list(Some(&prefix)) - // TODO: we should signal error instead of filtering - .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))) - .err_into(); - Ok(stream.boxed()) - } - - async fn delete_batch( + async fn delete_batch( &self, prefix: &str, - batch: Vec>, + batch: Vec, ) -> StorageResult { - let keys = batch.iter().map(|id| Ok(self.get_path(prefix, id))); + let keys = batch.iter().map(|id| Ok(self.get_path_str(prefix, id))); let results = self.store.delete_stream(stream::iter(keys).boxed()); // FIXME: flag errors instead of skipping them Ok(results.filter(|res| ready(res.is_ok())).count().await) } - - async fn delete_objects( - &self, - prefix: &str, - ids: BoxStream<'_, ObjectId>, - ) -> StorageResult { - let deleted = AtomicUsize::new(0); - ids.chunks(1_000) - // FIXME: configurable concurrency - .for_each_concurrent(10, |batch| { - let deleted = &deleted; - async move { - // FIXME: handle error instead of skipping - let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); - deleted.fetch_add(new_deletes, Ordering::Release); - } - }) - .await; - Ok(deleted.into_inner()) - } } impl private::Sealed for ObjectStorage {} @@ -413,52 +376,43 @@ impl Storage for ObjectStorage { .map(|_| ()) } - async fn list_chunks( - &self, - ) -> StorageResult>>> { - self.list_objects(CHUNK_PREFIX) - } - - async fn list_manifests( - &self, - ) -> StorageResult>>> { - self.list_objects(MANIFEST_PREFIX) - } - - async fn list_snapshots( - &self, - ) -> StorageResult>>> { - self.list_objects(SNAPSHOT_PREFIX) - } - - async fn delete_chunks( - &self, - chunks: BoxStream<'_, ChunkId>, - ) -> StorageResult { - self.delete_objects(CHUNK_PREFIX, chunks).await - } - - async fn delete_manifests( - &self, - chunks: BoxStream<'_, ManifestId>, - ) -> StorageResult { - self.delete_objects(MANIFEST_PREFIX, chunks).await + async fn list_objects<'a>( + &'a self, + prefix: &str, + ) -> StorageResult>>> { + let prefix = ObjectPath::from(format!("{}/{}", self.prefix.as_str(), prefix)); + let stream = self + .store + .list(Some(&prefix)) + // TODO: we should signal error instead of filtering + .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))) + .err_into(); + Ok(stream.boxed()) } - async fn delete_snapshots( + async fn delete_objects( &self, - chunks: BoxStream<'_, SnapshotId>, + prefix: &str, + ids: BoxStream<'_, String>, ) -> StorageResult { - self.delete_objects(SNAPSHOT_PREFIX, chunks).await + let deleted = AtomicUsize::new(0); + ids.chunks(1_000) + // FIXME: configurable concurrency + .for_each_concurrent(10, |batch| { + let deleted = &deleted; + async move { + // FIXME: handle error instead of skipping + let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); + deleted.fetch_add(new_deletes, Ordering::Release); + } + }) + .await; + Ok(deleted.into_inner()) } } -fn object_to_list_info<'a, Id>(object: &ObjectMeta) -> Option> -where - Id: for<'b> TryFrom<&'b str> + Send + 'a, -{ +fn object_to_list_info(object: &ObjectMeta) -> Option> { let created_at = object.last_modified; - let id_str = object.location.filename()?; - let id = Id::try_from(id_str).ok()?; + let id = object.location.filename()?.to_string(); Some(ListInfo { id, created_at }) } diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index d71696cd..3c6398c6 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -135,18 +135,18 @@ impl S3Storage { Ok(S3Storage { client, prefix: prefix.into(), bucket: bucket_name.into() }) } + fn get_path_str(&self, file_prefix: &str, id: &str) -> StorageResult { + let path = PathBuf::from_iter([self.prefix.as_str(), file_prefix, id]); + path.into_os_string().into_string().map_err(StorageError::BadPrefix) + } + fn get_path( &self, file_prefix: &str, id: &ObjectId, ) -> StorageResult { // we serialize the url using crockford - let path = PathBuf::from_iter([ - self.prefix.as_str(), - file_prefix, - id.to_string().as_str(), - ]); - path.into_os_string().into_string().map_err(StorageError::BadPrefix) + self.get_path_str(file_prefix, id.to_string().as_str()) } fn get_snapshot_path(&self, id: &SnapshotId) -> StorageResult { @@ -217,45 +217,16 @@ impl S3Storage { Ok(()) } - fn list_objects<'a, Id>( - &'a self, - prefix: &str, - ) -> StorageResult>>> - where - Id: for<'b> TryFrom<&'b str> + Send + 'a, - { - let prefix = PathBuf::from_iter([self.prefix.as_str(), prefix]) - .into_os_string() - .into_string() - .map_err(StorageError::BadPrefix)?; - let stream = self - .client - .list_objects_v2() - .bucket(self.bucket.clone()) - .prefix(prefix) - .into_paginator() - .send() - .into_stream_03x() - .try_filter_map(|page| { - let contents = page.contents.map(|cont| stream::iter(cont).map(Ok)); - ready(Ok(contents)) - }) - .try_flatten() - // TODO: we should signal error instead of filtering - .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))); - Ok(stream.boxed()) - } - - async fn delete_batch( + async fn delete_batch( &self, prefix: &str, - batch: Vec>, + batch: Vec, ) -> StorageResult { let keys = batch .iter() // FIXME: flag errors instead of skipping them .filter_map(|id| { - let key = self.get_path(prefix, id).ok()?; + let key = self.get_path_str(prefix, id).ok()?; let ident = ObjectIdentifier::builder().key(key).build().ok()?; Some(ident) }) @@ -276,26 +247,6 @@ impl S3Storage { Ok(res.deleted().len()) } - - async fn delete_objects( - &self, - prefix: &str, - ids: BoxStream<'_, ObjectId>, - ) -> StorageResult { - let deleted = AtomicUsize::new(0); - ids.chunks(1_000) - // FIXME: configurable concurrency - .for_each_concurrent(10, |batch| { - let deleted = &deleted; - async move { - // FIXME: handle error instead of skipping - let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); - deleted.fetch_add(new_deletes, Ordering::Release); - } - }) - .await; - Ok(deleted.into_inner()) - } } pub fn range_to_header(range: &ByteRange) -> Option { @@ -511,54 +462,57 @@ impl Storage for S3Storage { } } - async fn list_chunks( - &self, - ) -> StorageResult>>> { - self.list_objects(CHUNK_PREFIX) - } - - async fn list_manifests( - &self, - ) -> StorageResult>>> { - self.list_objects(MANIFEST_PREFIX) - } - - async fn list_snapshots( - &self, - ) -> StorageResult>>> { - self.list_objects(SNAPSHOT_PREFIX) - } - - async fn delete_chunks( - &self, - chunks: BoxStream<'_, ChunkId>, - ) -> StorageResult { - self.delete_objects(CHUNK_PREFIX, chunks).await - } - - async fn delete_manifests( - &self, - chunks: BoxStream<'_, ManifestId>, - ) -> StorageResult { - self.delete_objects(MANIFEST_PREFIX, chunks).await + async fn list_objects<'a>( + &'a self, + prefix: &str, + ) -> StorageResult>>> { + let prefix = PathBuf::from_iter([self.prefix.as_str(), prefix]) + .into_os_string() + .into_string() + .map_err(StorageError::BadPrefix)?; + let stream = self + .client + .list_objects_v2() + .bucket(self.bucket.clone()) + .prefix(prefix) + .into_paginator() + .send() + .into_stream_03x() + .try_filter_map(|page| { + let contents = page.contents.map(|cont| stream::iter(cont).map(Ok)); + ready(Ok(contents)) + }) + .try_flatten() + // TODO: we should signal error instead of filtering + .try_filter_map(|object| ready(Ok(object_to_list_info(&object)))); + Ok(stream.boxed()) } - async fn delete_snapshots( + async fn delete_objects( &self, - chunks: BoxStream<'_, SnapshotId>, + prefix: &str, + ids: BoxStream<'_, String>, ) -> StorageResult { - self.delete_objects(SNAPSHOT_PREFIX, chunks).await + let deleted = AtomicUsize::new(0); + ids.chunks(1_000) + // FIXME: configurable concurrency + .for_each_concurrent(10, |batch| { + let deleted = &deleted; + async move { + // FIXME: handle error instead of skipping + let new_deletes = self.delete_batch(prefix, batch).await.unwrap_or(0); + deleted.fetch_add(new_deletes, Ordering::Release); + } + }) + .await; + Ok(deleted.into_inner()) } } -fn object_to_list_info<'a, Id>(object: &Object) -> Option> -where - Id: for<'b> TryFrom<&'b str> + Send + 'a, -{ +fn object_to_list_info(object: &Object) -> Option> { let key = object.key()?; let last_modified = object.last_modified()?; let created_at = last_modified.to_chrono_utc().ok()?; - let id_str = Path::new(key).file_name().and_then(|s| s.to_str())?; - let id = Id::try_from(id_str).ok()?; + let id = Path::new(key).file_name().and_then(|s| s.to_str())?.to_string(); Some(ListInfo { id, created_at }) }