From 7c6a5926d04f5413f1719db876f8ef9c95e8b322 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Sat, 12 Oct 2024 23:12:42 -0300 Subject: [PATCH] Manifests shrink when there are fewer chunk references In the previous algorithm we were copying all chunks from the previous version to the new manifest (modulo changes in the current session). Now, we only copy the chunks we need to copy. For example, after a `clear` operation, there will be 0 references in the manifest. Most of the commit is moving code around to make it accessible from `distributed_flush`, that is, moving it from the `Repository` impl to free functions. The only important change happens in `distributed_flush`, where instead of starting from all the chunks in the previous manifest, we use the `all_chunks` iterator. A new test verifies manifests shrink when we delete chunks or whole arrays. Implements: #174 --- icechunk/src/format/manifest.rs | 19 +- icechunk/src/format/snapshot.rs | 7 +- icechunk/src/repository.rs | 569 +++++++++++++++++--------------- 3 files changed, 322 insertions(+), 273 deletions(-) diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index 95d8856c..7d76be79 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, TryStreamExt}; use itertools::Itertools; use std::{collections::BTreeMap, ops::Bound, sync::Arc}; use thiserror::Error; @@ -136,13 +137,29 @@ impl Manifest { } } + pub async fn from_stream( + chunks: impl Stream>, + ) -> Result { + let mut chunk_map = BTreeMap::new(); + pin_mut!(chunks); + while let Some(chunk) = chunks.try_next().await? { + chunk_map.insert((chunk.node, chunk.coord), chunk.payload); + } + Ok(Self::new(chunk_map)) + } + pub fn chunks(&self) -> &BTreeMap<(NodeId, ChunkIndices), ChunkPayload> { &self.chunks } - pub fn size(&self) -> usize { + pub fn len(&self) -> usize { self.chunks.len() } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl FromIterator for Manifest { diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index bd21dbd9..8fd4f009 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -208,9 +208,14 @@ impl Snapshot { .map(move |ix| self.short_term_history[ix].clone()) } - pub fn size(&self) -> usize { + pub fn len(&self) -> usize { self.nodes.len() } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } // We need this complex dance because Rust makes it really hard to put together an object and a diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 6e16d171..54e30bc6 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::HashSet, iter::{self}, pin::Pin, sync::Arc, @@ -32,7 +32,6 @@ use chrono::Utc; use futures::{future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; use itertools::Either; use thiserror::Error; -use tokio::task; use crate::{ format::{ @@ -444,30 +443,8 @@ impl Repository { Ok(new) } - // FIXME: add moves - pub async fn get_node(&self, path: &Path) -> RepositoryResult { - // We need to look for nodes in self.change_set and the snapshot file - if self.change_set.is_deleted(path) { - return Err(RepositoryError::NodeNotFound { - path: path.clone(), - message: "getting node".to_string(), - }); - } - match self.change_set.get_new_node(path) { - Some(node) => Ok(node), - None => { - let node = self.get_existing_node(path).await?; - if self.change_set.is_deleted(&node.path) { - Err(RepositoryError::NodeNotFound { - path: path.clone(), - message: "getting node".to_string(), - }) - } else { - Ok(node) - } - } - } + get_node(self.storage.as_ref(), &self.change_set, self.snapshot_id(), path).await } pub async fn get_array(&self, path: &Path) -> RepositoryResult { @@ -492,45 +469,6 @@ impl Repository { } } - async fn get_existing_node(&self, path: &Path) -> RepositoryResult { - // An existing node is one that is present in a Snapshot file on storage - let snapshot_id = &self.snapshot_id; - let snapshot = self.storage.fetch_snapshot(snapshot_id).await?; - - let node = snapshot.get_node(path).map_err(|err| match err { - // A missing node here is not really a format error, so we need to - // generate the correct error for repositories - IcechunkFormatError::NodeNotFound { path } => RepositoryError::NodeNotFound { - path, - message: "existing node not found".to_string(), - }, - err => RepositoryError::FormatError(err), - })?; - let session_atts = self - .change_set - .get_user_attributes(node.id) - .cloned() - .map(|a| a.map(UserAttributesSnapshot::Inline)); - let res = NodeSnapshot { - user_attributes: session_atts.unwrap_or_else(|| node.user_attributes.clone()), - ..node.clone() - }; - if let Some(session_meta) = - self.change_set.get_updated_zarr_metadata(node.id).cloned() - { - if let NodeData::Array(_, manifests) = res.node_data { - Ok(NodeSnapshot { - node_data: NodeData::Array(session_meta, manifests), - ..res - }) - } else { - Ok(res) - } - } else { - Ok(res) - } - } - pub async fn get_chunk_ref( &self, path: &Path, @@ -697,111 +635,6 @@ impl Repository { Ok(None) } - /// Warning: The presence of a single error may mean multiple missing items - async fn updated_chunk_iterator( - &self, - ) -> RepositoryResult> + '_> - { - let snapshot = self.storage.fetch_snapshot(&self.snapshot_id).await?; - let nodes = futures::stream::iter(snapshot.iter_arc()); - let res = nodes.then(move |node| async move { - let path = node.path.clone(); - self.node_chunk_iterator(&node.path) - .await - .map_ok(move |ci| (path.clone(), ci)) - }); - Ok(res.flatten()) - } - - /// Warning: The presence of a single error may mean multiple missing items - async fn node_chunk_iterator( - &self, - path: &Path, - ) -> impl Stream> + '_ { - match self.get_node(path).await { - Ok(node) => futures::future::Either::Left( - self.verified_node_chunk_iterator(node).await, - ), - Err(_) => futures::future::Either::Right(futures::stream::empty()), - } - } - - /// Warning: The presence of a single error may mean multiple missing items - async fn verified_node_chunk_iterator( - &self, - node: NodeSnapshot, - ) -> impl Stream> + '_ { - match node.node_data { - NodeData::Group => futures::future::Either::Left(futures::stream::empty()), - NodeData::Array(_, manifests) => { - let new_chunk_indices: Box> = Box::new( - self.change_set - .array_chunks_iterator(node.id, &node.path) - .map(|(idx, _)| idx) - .collect(), - ); - - let new_chunks = self - .change_set - .array_chunks_iterator(node.id, &node.path) - .filter_map(move |(idx, payload)| { - payload.as_ref().map(|payload| { - Ok(ChunkInfo { - node: node.id, - coord: idx.clone(), - payload: payload.clone(), - }) - }) - }); - - futures::future::Either::Right( - futures::stream::iter(new_chunks).chain( - futures::stream::iter(manifests) - .then(move |manifest_ref| { - let new_chunk_indices = new_chunk_indices.clone(); - async move { - let manifest = self - .storage - .fetch_manifests(&manifest_ref.object_id) - .await; - match manifest { - Ok(manifest) => { - let old_chunks = manifest - .iter(&node.id) - .filter(move |(coord, _)| { - !new_chunk_indices.contains(coord) - }) - .map(move |(coord, payload)| ChunkInfo { - node: node.id, - coord, - payload, - }); - - let old_chunks = - self.change_set.update_existing_chunks( - node.id, old_chunks, - ); - futures::future::Either::Left( - futures::stream::iter(old_chunks.map(Ok)), - ) - } - // if we cannot even fetch the manifest, we generate a - // single error value. - Err(err) => futures::future::Either::Right( - futures::stream::once(ready(Err( - RepositoryError::StorageError(err), - ))), - ), - } - } - }) - .flatten(), - ), - ) - } - } - } - pub async fn list_nodes( &self, ) -> RepositoryResult + '_> { @@ -818,10 +651,7 @@ impl Repository { &self, ) -> RepositoryResult> + '_> { - let existing_array_chunks = self.updated_chunk_iterator().await?; - let new_array_chunks = - futures::stream::iter(self.change_set.new_arrays_chunk_iterator().map(Ok)); - Ok(existing_array_chunks.chain(new_array_chunks)) + all_chunks(self.storage.as_ref(), &self.change_set, self.snapshot_id()).await } pub async fn distributed_flush>( @@ -834,8 +664,8 @@ impl Repository { let change_sets = iter::once(self.change_set.clone()).chain(other_change_sets); let new_snapshot_id = distributed_flush( self.storage.as_ref(), - self.snapshot_id(), change_sets, + self.snapshot_id(), message, properties, ) @@ -999,26 +829,6 @@ fn new_inline_chunk(data: Bytes) -> ChunkPayload { ChunkPayload::Inline(data) } -fn update_manifest( - original_chunks: &mut BTreeMap<(NodeId, ChunkIndices), ChunkPayload>, - set_chunks: &HashMap>>, -) { - for (node_id, chunks) in set_chunks.iter() { - for (coord, maybe_payload) in chunks.iter() { - match maybe_payload { - Some(payload) => { - // a chunk was updated or inserted - original_chunks.insert((*node_id, coord.clone()), payload.clone()); - } - None => { - // a chunk was deleted - original_chunks.remove(&(*node_id, coord.clone())); - } - } - } - } -} - pub async fn get_chunk( reader: Option> + Send>>>, ) -> RepositoryResult> { @@ -1064,10 +874,79 @@ async fn updated_nodes<'a>( .chain(change_set.new_nodes_iterator(manifest_id))) } +async fn get_node<'a>( + storage: &(dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> RepositoryResult { + // We need to look for nodes in self.change_set and the snapshot file + if change_set.is_deleted(path) { + return Err(RepositoryError::NodeNotFound { + path: path.clone(), + message: "getting node".to_string(), + }); + } + match change_set.get_new_node(path) { + Some(node) => Ok(node), + None => { + let node = get_existing_node(storage, change_set, snapshot_id, path).await?; + if change_set.is_deleted(&node.path) { + Err(RepositoryError::NodeNotFound { + path: path.clone(), + message: "getting node".to_string(), + }) + } else { + Ok(node) + } + } + } +} + +async fn get_existing_node<'a>( + storage: &(dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> RepositoryResult { + // An existing node is one that is present in a Snapshot file on storage + let snapshot = storage.fetch_snapshot(snapshot_id).await?; + + let node = snapshot.get_node(path).map_err(|err| match err { + // A missing node here is not really a format error, so we need to + // generate the correct error for repositories + IcechunkFormatError::NodeNotFound { path } => RepositoryError::NodeNotFound { + path, + message: "existing node not found".to_string(), + }, + err => RepositoryError::FormatError(err), + })?; + let session_atts = change_set + .get_user_attributes(node.id) + .cloned() + .map(|a| a.map(UserAttributesSnapshot::Inline)); + let res = NodeSnapshot { + user_attributes: session_atts.unwrap_or_else(|| node.user_attributes.clone()), + ..node.clone() + }; + if let Some(session_meta) = change_set.get_updated_zarr_metadata(node.id).cloned() { + if let NodeData::Array(_, manifests) = res.node_data { + Ok(NodeSnapshot { + node_data: NodeData::Array(session_meta, manifests), + ..res + }) + } else { + Ok(res) + } + } else { + Ok(res) + } +} + async fn distributed_flush>( storage: &(dyn Storage + Send + Sync), - parent_id: &SnapshotId, change_sets: I, + parent_id: &SnapshotId, message: &str, properties: SnapshotProperties, ) -> RepositoryResult { @@ -1077,93 +956,155 @@ async fn distributed_flush>( return Err(RepositoryError::NoChangesToCommit); } - // We search for the current manifest. We are assumming a single one for now - let old_snapshot = storage.fetch_snapshot(parent_id).await?; - let old_snapshot_c = Arc::clone(&old_snapshot); - let manifest_id = old_snapshot_c.iter_arc().find_map(|node| { - match node.node_data { - NodeData::Array(_, man) => { - // TODO: can we avoid clone - man.first().map(|manifest| manifest.object_id.clone()) - } - NodeData::Group => None, - } - }); + let chunks = all_chunks(storage, &change_set, parent_id) + .await? + .map_ok(|(_path, chunk_info)| chunk_info); - let old_manifest = match manifest_id { - Some(ref manifest_id) => storage.fetch_manifests(manifest_id).await?, - // If there is no previous manifest we create an empty one - None => Arc::new(Manifest::default()), - }; + let new_manifest = Arc::new(Manifest::from_stream(chunks).await?); + let new_manifest_id = ObjectId::random(); + storage.write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)).await?; - // The manifest update process is CPU intensive, so we want to executed it on a worker - // thread. Currently it's also destructive of the manifest, so we are also cloning the - // old manifest data - // - // The update process requires reference access to the set_chunks map, since we are running - // it on blocking task, it wants that reference to be 'static, which we cannot provide. - // As a solution, we temporarily `take` the map, replacing it an empty one, run the thread, - // and at the end we put the map back to where it was, in case there is some later failure. - // We always want to leave things in the previous state if there was a failure. - - let chunk_changes = Arc::new(change_set.take_chunks()); - let chunk_changes_c = Arc::clone(&chunk_changes); - - let update_task = task::spawn_blocking(move || { - //FIXME: avoid clone, this one is extremely expensive en memory - //it's currently needed because we don't want to destroy the manifest in case of later - //failure - let mut new_chunks = old_manifest.as_ref().chunks().clone(); - update_manifest(&mut new_chunks, &chunk_changes_c); - (new_chunks, chunk_changes) + let all_nodes = + updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?; + + let old_snapshot = storage.fetch_snapshot(parent_id).await?; + let mut new_snapshot = Snapshot::from_iter( + old_snapshot.as_ref(), + Some(properties), + vec![ManifestFileInfo { + id: new_manifest_id.clone(), + format_version: new_manifest.icechunk_manifest_format_version, + }], + vec![], + all_nodes, + ); + new_snapshot.metadata.message = message.to_string(); + new_snapshot.metadata.written_at = Utc::now(); + + let new_snapshot = Arc::new(new_snapshot); + let new_snapshot_id = &new_snapshot.metadata.id; + storage.write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)).await?; + + Ok(new_snapshot_id.clone()) +} + +/// Warning: The presence of a single error may mean multiple missing items +async fn updated_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &'a SnapshotId, +) -> RepositoryResult> + 'a> { + let snapshot = storage.fetch_snapshot(snapshot_id).await?; + let nodes = futures::stream::iter(snapshot.iter_arc()); + let res = nodes.then(move |node| async move { + let path = node.path.clone(); + node_chunk_iterator(storage, change_set, snapshot_id, &node.path) + .await + .map_ok(move |ci| (path.clone(), ci)) }); + Ok(res.flatten()) +} - match update_task.await { - Ok((new_chunks, chunk_changes)) => { - // reset the set_chunks map to it's previous value - #[allow(clippy::expect_used)] - { - // It's OK to call into_inner here because we created the Arc locally and never - // shared it with other code - let chunks = - Arc::into_inner(chunk_changes).expect("Bug in flush task join"); - change_set.set_chunks(chunks); - } +/// Warning: The presence of a single error may mean multiple missing items +async fn node_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> impl Stream> + 'a { + match get_node(storage, change_set, snapshot_id, path).await { + Ok(node) => futures::future::Either::Left( + verified_node_chunk_iterator(storage, change_set, node).await, + ), + Err(_) => futures::future::Either::Right(futures::stream::empty()), + } +} - let new_manifest = Arc::new(Manifest::new(new_chunks)); - let new_manifest_id = ObjectId::random(); - storage - .write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)) - .await?; - - let all_nodes = - updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?; - - let mut new_snapshot = Snapshot::from_iter( - old_snapshot.as_ref(), - Some(properties), - vec![ManifestFileInfo { - id: new_manifest_id.clone(), - format_version: new_manifest.icechunk_manifest_format_version, - }], - vec![], - all_nodes, +/// Warning: The presence of a single error may mean multiple missing items +async fn verified_node_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + node: NodeSnapshot, +) -> impl Stream> + 'a { + match node.node_data { + NodeData::Group => futures::future::Either::Left(futures::stream::empty()), + NodeData::Array(_, manifests) => { + let new_chunk_indices: Box> = Box::new( + change_set + .array_chunks_iterator(node.id, &node.path) + .map(|(idx, _)| idx) + .collect(), ); - new_snapshot.metadata.message = message.to_string(); - new_snapshot.metadata.written_at = Utc::now(); - - let new_snapshot = Arc::new(new_snapshot); - let new_snapshot_id = &new_snapshot.metadata.id; - storage - .write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)) - .await?; - Ok(new_snapshot_id.clone()) + let new_chunks = change_set + .array_chunks_iterator(node.id, &node.path) + .filter_map(move |(idx, payload)| { + payload.as_ref().map(|payload| { + Ok(ChunkInfo { + node: node.id, + coord: idx.clone(), + payload: payload.clone(), + }) + }) + }); + + futures::future::Either::Right( + futures::stream::iter(new_chunks).chain( + futures::stream::iter(manifests) + .then(move |manifest_ref| { + let new_chunk_indices = new_chunk_indices.clone(); + async move { + let manifest = storage + .fetch_manifests(&manifest_ref.object_id) + .await; + match manifest { + Ok(manifest) => { + let old_chunks = manifest + .iter(&node.id) + .filter(move |(coord, _)| { + !new_chunk_indices.contains(coord) + }) + .map(move |(coord, payload)| ChunkInfo { + node: node.id, + coord, + payload, + }); + + let old_chunks = change_set + .update_existing_chunks(node.id, old_chunks); + futures::future::Either::Left( + futures::stream::iter(old_chunks.map(Ok)), + ) + } + // if we cannot even fetch the manifest, we generate a + // single error value. + Err(err) => futures::future::Either::Right( + futures::stream::once(ready(Err( + RepositoryError::StorageError(err), + ))), + ), + } + } + }) + .flatten(), + ), + ) } - Err(_) => Err(RepositoryError::OtherFlushError), } } +async fn all_chunks<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &'a SnapshotId, +) -> RepositoryResult> + 'a> { + let existing_array_chunks = + updated_chunk_iterator(storage, change_set, snapshot_id).await?; + let new_array_chunks = + futures::stream::iter(change_set.new_arrays_chunk_iterator().map(Ok)); + Ok(existing_array_chunks.chain(new_array_chunks)) +} + #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { @@ -1874,6 +1815,92 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_manifests_shrink() -> Result<(), Box> { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut ds = Repository::init(Arc::clone(&storage), false).await?.build(); + ds.add_group(Path::root()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5, 5], + data_type: DataType::Float16, + chunk_shape: ChunkShape(vec![NonZeroU64::new(2).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Float16(f32::NEG_INFINITY), + codecs: vec![Codec { name: "mycodec".to_string(), configuration: None }], + storage_transformers: Some(vec![StorageTransformer { + name: "mytransformer".to_string(), + configuration: None, + }]), + dimension_names: Some(vec![Some("t".to_string())]), + }; + + let a1path: Path = "/array1".try_into()?; + let a2path: Path = "/array2".try_into()?; + + ds.add_array(a1path.clone(), zarr_meta.clone()).await?; + ds.add_array(a2path.clone(), zarr_meta.clone()).await?; + + // add 3 chunks + ds.set_chunk_ref( + a1path.clone(), + ChunkIndices(vec![0, 0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + ds.set_chunk_ref( + a1path.clone(), + ChunkIndices(vec![0, 1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + ds.set_chunk_ref( + a2path.clone(), + ChunkIndices(vec![0, 1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + + ds.commit("main", "commit", None).await?; + + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let initial_size = manifest.len(); + + ds.delete_array(a2path).await?; + ds.commit("main", "array2 deleted", None).await?; + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let size_after_delete = manifest.len(); + + assert!(size_after_delete < initial_size); + + // delete a chunk + ds.set_chunk_ref(a1path.clone(), ChunkIndices(vec![0, 0]), None).await?; + ds.commit("main", "chunk deleted", None).await?; + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let size_after_chunk_delete = manifest.len(); + assert!(size_after_chunk_delete < size_after_delete); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_all_chunks_iterator() -> Result<(), Box> { let storage: Arc =