Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust Icechunk learns how to collect garbage #368

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ aws-sdk-s3 = "1.53.0"
aws-config = "1.5.7"
aws-credential-types = "1.2.1"
typed-path = "0.9.2"
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono", "convert-streams"] }

[dev-dependencies]
pretty_assertions = "1.4.1"
Expand Down
1 change: 1 addition & 0 deletions icechunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
pub mod change_set;
pub mod format;
pub mod metadata;
pub mod ops;
pub mod refs;
pub mod repository;
pub mod storage;
Expand Down
296 changes: 296 additions & 0 deletions icechunk/src/ops/gc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
use std::{collections::HashSet, future::ready, iter};

use chrono::{DateTime, Utc};
use futures::{stream, Stream, StreamExt, TryStreamExt};
use tokio::pin;

use crate::{
format::{ChunkId, ManifestId, SnapshotId},
refs::{list_refs, RefError},
repository::ChunkPayload,
storage::ListInfo,
Storage, StorageError,
};

#[derive(Debug, PartialEq, Eq)]
pub enum Action {
Keep,
DeleteIfCreatedBefore(DateTime<Utc>),
}

#[derive(Debug)]
pub struct GCConfig {
extra_roots: HashSet<SnapshotId>,
dangling_chunks: Action,
dangling_manifests: Action,
dangling_attributes: Action,
dangling_transaction_logs: Action,
dangling_snapshots: Action,
}

impl GCConfig {
pub fn new(
extra_roots: HashSet<SnapshotId>,
dangling_chunks: Action,
dangling_manifests: Action,
dangling_attributes: Action,
dangling_transaction_logs: Action,
dangling_snapshots: Action,
) -> Self {
GCConfig {
extra_roots,
dangling_chunks,
dangling_manifests,
dangling_attributes,
dangling_transaction_logs,
dangling_snapshots,
}
}
pub fn clean_all(
chunks_age: DateTime<Utc>,
metadata_age: DateTime<Utc>,
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the rationale for having two different ages here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought, giving that metadata files are far less numerous than chunks, users would be willing to preserve them longer. If we someday support some form of reflog (or even without) it could be useful to have them around for forensics or understanding. It wouldn't allow to recover data, but you could at least recover "structure". For example, a reasonable thing to do could be to GC chunks older than 1 month and metadata older than 1 year.

extra_roots: Option<HashSet<SnapshotId>>,
) -> Self {
use Action::DeleteIfCreatedBefore as D;
Self::new(
extra_roots.unwrap_or_default(),
D(chunks_age),
D(metadata_age),
D(metadata_age),
D(metadata_age),
D(metadata_age),
)
}

fn action_needed(&self) -> bool {
[
&self.dangling_chunks,
&self.dangling_manifests,
&self.dangling_attributes,
&self.dangling_transaction_logs,
&self.dangling_snapshots,
]
.into_iter()
.any(|action| action != &Action::Keep)
}

pub fn deletes_chunks(&self) -> bool {
self.dangling_chunks != Action::Keep
}

pub fn deletes_manifests(&self) -> bool {
self.dangling_manifests != Action::Keep
}

pub fn deletes_attributes(&self) -> bool {
self.dangling_attributes != Action::Keep
}

pub fn deletes_transaction_logs(&self) -> bool {
self.dangling_transaction_logs != Action::Keep
}

pub fn deletes_snapshots(&self) -> bool {
self.dangling_snapshots != Action::Keep
}

fn must_delete_chunk(&self, chunk: &ListInfo<ChunkId>) -> bool {
match self.dangling_chunks {
Action::DeleteIfCreatedBefore(before) => chunk.created_at < before,
_ => false,
}
}

fn must_delete_manifest(&self, manifest: &ListInfo<ManifestId>) -> bool {
match self.dangling_manifests {
Action::DeleteIfCreatedBefore(before) => manifest.created_at < before,
_ => false,
}
}

fn must_delete_snapshot(&self, snapshot: &ListInfo<SnapshotId>) -> bool {
match self.dangling_snapshots {
Action::DeleteIfCreatedBefore(before) => snapshot.created_at < before,
_ => false,
}
}
}

#[derive(Debug, PartialEq, Eq, Default)]
pub struct GCSummary {
pub chunks_deleted: usize,
pub manifests_deleted: usize,
pub snapshots_deleted: usize,
pub attributes_deleted: usize,
pub transaction_logs_deleted: usize,
}

#[derive(Debug, thiserror::Error)]
pub enum GCError {
#[error("ref error {0}")]
Ref(#[from] RefError),
#[error("storage error {0}")]
Storage(#[from] StorageError),
}

pub type GCResult<A> = Result<A, GCError>;

pub async fn garbage_collect(
storage: &(dyn Storage + Send + Sync),
config: &GCConfig,
) -> GCResult<GCSummary> {
// TODO: this function could have much more parallelism
if !config.action_needed() {
return Ok(GCSummary::default());
}

let all_snaps = pointed_snapshots(storage, &config.extra_roots).await?;

// FIXME: add attribute files
// FIXME: add transaction log files
let mut keep_chunks = HashSet::new();
let mut keep_manifests = HashSet::new();
let mut keep_snapshots = HashSet::new();

pin!(all_snaps);
while let Some(snap_id) = all_snaps.try_next().await? {
let snap = storage.fetch_snapshot(&snap_id).await?;
if config.deletes_snapshots() {
keep_snapshots.insert(snap_id);
}

if config.deletes_manifests() {
keep_manifests.extend(snap.manifest_files.iter().map(|mf| mf.id.clone()));
}

if config.deletes_chunks() {
for manifest_file in snap.manifest_files.iter() {
let manifest_id = &manifest_file.id;
let manifest = storage.fetch_manifests(manifest_id).await?;
let chunk_ids =
manifest.chunks().values().filter_map(|payload| match payload {
ChunkPayload::Ref(chunk_ref) => Some(chunk_ref.id.clone()),
_ => None,
});
keep_chunks.extend(chunk_ids);
}
}
}

let mut summary = GCSummary::default();

if config.deletes_snapshots() {
summary.snapshots_deleted = gc_snapshots(storage, config, keep_snapshots).await?;
}
if config.deletes_manifests() {
summary.manifests_deleted = gc_manifests(storage, config, keep_manifests).await?;
}
if config.deletes_chunks() {
summary.chunks_deleted = gc_chunks(storage, config, keep_chunks).await?;
}

Ok(summary)
}

async fn all_roots<'a>(
storage: &'a (dyn Storage + Send + Sync),
extra_roots: &'a HashSet<SnapshotId>,
) -> GCResult<impl Stream<Item = GCResult<SnapshotId>> + 'a> {
let all_refs = list_refs(storage).await?;
// TODO: this could be optimized by not following the ancestry of snapshots that we have
// already seen
let roots =
stream::iter(all_refs)
.then(move |r| async move {
r.fetch(storage).await.map(|ref_data| ref_data.snapshot)
})
.err_into()
.chain(stream::iter(extra_roots.iter().cloned()).map(Ok));
Ok(roots)
}

async fn pointed_snapshots<'a>(
storage: &'a (dyn Storage + Send + Sync),
extra_roots: &'a HashSet<SnapshotId>,
) -> GCResult<impl Stream<Item = GCResult<SnapshotId>> + 'a> {
let roots = all_roots(storage, extra_roots).await?;
Ok(roots
.and_then(move |snap_id| async move {
let snap = storage.fetch_snapshot(&snap_id).await?;
// FIXME: this should be global ancestry, not local
let parents = snap.local_ancestry().map(|parent| parent.id);
Ok(stream::iter(iter::once(snap_id).chain(parents))
.map(Ok::<SnapshotId, GCError>))
})
.try_flatten())
}

async fn gc_chunks(
storage: &(dyn Storage + Send + Sync),
config: &GCConfig,
keep_ids: HashSet<ChunkId>,
) -> GCResult<usize> {
let to_delete = storage
.list_chunks()
.await?
// TODO: don't skip over errors
.filter_map(move |chunk| {
ready(chunk.ok().and_then(|chunk| {
if config.must_delete_chunk(&chunk) && !keep_ids.contains(&chunk.id) {
Some(chunk.id.clone())
} else {
None
}
}))
})
.boxed();
Ok(storage.delete_chunks(to_delete).await?)
}

async fn gc_manifests(
storage: &(dyn Storage + Send + Sync),
config: &GCConfig,
keep_ids: HashSet<ManifestId>,
) -> GCResult<usize> {
let to_delete = storage
.list_manifests()
.await?
// TODO: don't skip over errors
.filter_map(move |manifest| {
ready(manifest.ok().and_then(|manifest| {
if config.must_delete_manifest(&manifest)
&& !keep_ids.contains(&manifest.id)
{
Some(manifest.id.clone())
} else {
None
}
}))
})
.boxed();
Ok(storage.delete_manifests(to_delete).await?)
}

async fn gc_snapshots(
storage: &(dyn Storage + Send + Sync),
config: &GCConfig,
keep_ids: HashSet<SnapshotId>,
) -> GCResult<usize> {
let to_delete = storage
.list_snapshots()
.await?
// TODO: don't skip over errors
.filter_map(move |snapshot| {
ready(snapshot.ok().and_then(|snapshot| {
if config.must_delete_snapshot(&snapshot)
&& !keep_ids.contains(&snapshot.id)
{
Some(snapshot.id.clone())
} else {
None
}
}))
})
.boxed();
Ok(storage.delete_snapshots(to_delete).await?)
}
1 change: 1 addition & 0 deletions icechunk/src/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod gc;
10 changes: 10 additions & 0 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ impl Ref {
},
}
}

pub async fn fetch(
&self,
storage: &(dyn Storage + Send + Sync),
) -> RefResult<RefData> {
match self {
Ref::Tag(name) => fetch_tag(storage, name).await,
Ref::Branch(name) => fetch_branch_tip(storage, name).await,
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Repository {
return Err(RepositoryError::AlreadyInitialized);
}
let new_snapshot = Snapshot::empty();
let new_snapshot_id = ObjectId::random();
let new_snapshot_id = new_snapshot.metadata.id.clone();
storage.write_snapshot(new_snapshot_id.clone(), Arc::new(new_snapshot)).await?;
update_branch(
storage.as_ref(),
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Repository {
let parent = self.storage.fetch_snapshot(self.snapshot_id()).await?;
let last = parent.metadata.clone();
let it = if parent.short_term_history.len() < parent.total_parents as usize {
// TODO: implement splitting of snapshot history
// FIXME: implement splitting of snapshot history
Either::Left(parent.local_ancestry().chain(iter::once_with(|| todo!())))
} else {
Either::Right(parent.local_ancestry())
Expand Down
Loading
Loading