diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index 95d8856c..7d76be79 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, TryStreamExt}; use itertools::Itertools; use std::{collections::BTreeMap, ops::Bound, sync::Arc}; use thiserror::Error; @@ -136,13 +137,29 @@ impl Manifest { } } + pub async fn from_stream( + chunks: impl Stream>, + ) -> Result { + let mut chunk_map = BTreeMap::new(); + pin_mut!(chunks); + while let Some(chunk) = chunks.try_next().await? { + chunk_map.insert((chunk.node, chunk.coord), chunk.payload); + } + Ok(Self::new(chunk_map)) + } + pub fn chunks(&self) -> &BTreeMap<(NodeId, ChunkIndices), ChunkPayload> { &self.chunks } - pub fn size(&self) -> usize { + pub fn len(&self) -> usize { self.chunks.len() } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl FromIterator for Manifest { diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index bd21dbd9..8fd4f009 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -208,9 +208,14 @@ impl Snapshot { .map(move |ix| self.short_term_history[ix].clone()) } - pub fn size(&self) -> usize { + pub fn len(&self) -> usize { self.nodes.len() } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } // We need this complex dance because Rust makes it really hard to put together an object and a diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 6e16d171..54e30bc6 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::HashSet, iter::{self}, pin::Pin, sync::Arc, @@ -32,7 +32,6 @@ use chrono::Utc; use futures::{future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; use itertools::Either; use thiserror::Error; -use tokio::task; use crate::{ format::{ @@ -444,30 +443,8 @@ impl Repository { Ok(new) } - // FIXME: add moves - pub async fn get_node(&self, path: &Path) -> RepositoryResult { - // We need to look for nodes in self.change_set and the snapshot file - if self.change_set.is_deleted(path) { - return Err(RepositoryError::NodeNotFound { - path: path.clone(), - message: "getting node".to_string(), - }); - } - match self.change_set.get_new_node(path) { - Some(node) => Ok(node), - None => { - let node = self.get_existing_node(path).await?; - if self.change_set.is_deleted(&node.path) { - Err(RepositoryError::NodeNotFound { - path: path.clone(), - message: "getting node".to_string(), - }) - } else { - Ok(node) - } - } - } + get_node(self.storage.as_ref(), &self.change_set, self.snapshot_id(), path).await } pub async fn get_array(&self, path: &Path) -> RepositoryResult { @@ -492,45 +469,6 @@ impl Repository { } } - async fn get_existing_node(&self, path: &Path) -> RepositoryResult { - // An existing node is one that is present in a Snapshot file on storage - let snapshot_id = &self.snapshot_id; - let snapshot = self.storage.fetch_snapshot(snapshot_id).await?; - - let node = snapshot.get_node(path).map_err(|err| match err { - // A missing node here is not really a format error, so we need to - // generate the correct error for repositories - IcechunkFormatError::NodeNotFound { path } => RepositoryError::NodeNotFound { - path, - message: "existing node not found".to_string(), - }, - err => RepositoryError::FormatError(err), - })?; - let session_atts = self - .change_set - .get_user_attributes(node.id) - .cloned() - .map(|a| a.map(UserAttributesSnapshot::Inline)); - let res = NodeSnapshot { - user_attributes: session_atts.unwrap_or_else(|| node.user_attributes.clone()), - ..node.clone() - }; - if let Some(session_meta) = - self.change_set.get_updated_zarr_metadata(node.id).cloned() - { - if let NodeData::Array(_, manifests) = res.node_data { - Ok(NodeSnapshot { - node_data: NodeData::Array(session_meta, manifests), - ..res - }) - } else { - Ok(res) - } - } else { - Ok(res) - } - } - pub async fn get_chunk_ref( &self, path: &Path, @@ -697,111 +635,6 @@ impl Repository { Ok(None) } - /// Warning: The presence of a single error may mean multiple missing items - async fn updated_chunk_iterator( - &self, - ) -> RepositoryResult> + '_> - { - let snapshot = self.storage.fetch_snapshot(&self.snapshot_id).await?; - let nodes = futures::stream::iter(snapshot.iter_arc()); - let res = nodes.then(move |node| async move { - let path = node.path.clone(); - self.node_chunk_iterator(&node.path) - .await - .map_ok(move |ci| (path.clone(), ci)) - }); - Ok(res.flatten()) - } - - /// Warning: The presence of a single error may mean multiple missing items - async fn node_chunk_iterator( - &self, - path: &Path, - ) -> impl Stream> + '_ { - match self.get_node(path).await { - Ok(node) => futures::future::Either::Left( - self.verified_node_chunk_iterator(node).await, - ), - Err(_) => futures::future::Either::Right(futures::stream::empty()), - } - } - - /// Warning: The presence of a single error may mean multiple missing items - async fn verified_node_chunk_iterator( - &self, - node: NodeSnapshot, - ) -> impl Stream> + '_ { - match node.node_data { - NodeData::Group => futures::future::Either::Left(futures::stream::empty()), - NodeData::Array(_, manifests) => { - let new_chunk_indices: Box> = Box::new( - self.change_set - .array_chunks_iterator(node.id, &node.path) - .map(|(idx, _)| idx) - .collect(), - ); - - let new_chunks = self - .change_set - .array_chunks_iterator(node.id, &node.path) - .filter_map(move |(idx, payload)| { - payload.as_ref().map(|payload| { - Ok(ChunkInfo { - node: node.id, - coord: idx.clone(), - payload: payload.clone(), - }) - }) - }); - - futures::future::Either::Right( - futures::stream::iter(new_chunks).chain( - futures::stream::iter(manifests) - .then(move |manifest_ref| { - let new_chunk_indices = new_chunk_indices.clone(); - async move { - let manifest = self - .storage - .fetch_manifests(&manifest_ref.object_id) - .await; - match manifest { - Ok(manifest) => { - let old_chunks = manifest - .iter(&node.id) - .filter(move |(coord, _)| { - !new_chunk_indices.contains(coord) - }) - .map(move |(coord, payload)| ChunkInfo { - node: node.id, - coord, - payload, - }); - - let old_chunks = - self.change_set.update_existing_chunks( - node.id, old_chunks, - ); - futures::future::Either::Left( - futures::stream::iter(old_chunks.map(Ok)), - ) - } - // if we cannot even fetch the manifest, we generate a - // single error value. - Err(err) => futures::future::Either::Right( - futures::stream::once(ready(Err( - RepositoryError::StorageError(err), - ))), - ), - } - } - }) - .flatten(), - ), - ) - } - } - } - pub async fn list_nodes( &self, ) -> RepositoryResult + '_> { @@ -818,10 +651,7 @@ impl Repository { &self, ) -> RepositoryResult> + '_> { - let existing_array_chunks = self.updated_chunk_iterator().await?; - let new_array_chunks = - futures::stream::iter(self.change_set.new_arrays_chunk_iterator().map(Ok)); - Ok(existing_array_chunks.chain(new_array_chunks)) + all_chunks(self.storage.as_ref(), &self.change_set, self.snapshot_id()).await } pub async fn distributed_flush>( @@ -834,8 +664,8 @@ impl Repository { let change_sets = iter::once(self.change_set.clone()).chain(other_change_sets); let new_snapshot_id = distributed_flush( self.storage.as_ref(), - self.snapshot_id(), change_sets, + self.snapshot_id(), message, properties, ) @@ -999,26 +829,6 @@ fn new_inline_chunk(data: Bytes) -> ChunkPayload { ChunkPayload::Inline(data) } -fn update_manifest( - original_chunks: &mut BTreeMap<(NodeId, ChunkIndices), ChunkPayload>, - set_chunks: &HashMap>>, -) { - for (node_id, chunks) in set_chunks.iter() { - for (coord, maybe_payload) in chunks.iter() { - match maybe_payload { - Some(payload) => { - // a chunk was updated or inserted - original_chunks.insert((*node_id, coord.clone()), payload.clone()); - } - None => { - // a chunk was deleted - original_chunks.remove(&(*node_id, coord.clone())); - } - } - } - } -} - pub async fn get_chunk( reader: Option> + Send>>>, ) -> RepositoryResult> { @@ -1064,10 +874,79 @@ async fn updated_nodes<'a>( .chain(change_set.new_nodes_iterator(manifest_id))) } +async fn get_node<'a>( + storage: &(dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> RepositoryResult { + // We need to look for nodes in self.change_set and the snapshot file + if change_set.is_deleted(path) { + return Err(RepositoryError::NodeNotFound { + path: path.clone(), + message: "getting node".to_string(), + }); + } + match change_set.get_new_node(path) { + Some(node) => Ok(node), + None => { + let node = get_existing_node(storage, change_set, snapshot_id, path).await?; + if change_set.is_deleted(&node.path) { + Err(RepositoryError::NodeNotFound { + path: path.clone(), + message: "getting node".to_string(), + }) + } else { + Ok(node) + } + } + } +} + +async fn get_existing_node<'a>( + storage: &(dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> RepositoryResult { + // An existing node is one that is present in a Snapshot file on storage + let snapshot = storage.fetch_snapshot(snapshot_id).await?; + + let node = snapshot.get_node(path).map_err(|err| match err { + // A missing node here is not really a format error, so we need to + // generate the correct error for repositories + IcechunkFormatError::NodeNotFound { path } => RepositoryError::NodeNotFound { + path, + message: "existing node not found".to_string(), + }, + err => RepositoryError::FormatError(err), + })?; + let session_atts = change_set + .get_user_attributes(node.id) + .cloned() + .map(|a| a.map(UserAttributesSnapshot::Inline)); + let res = NodeSnapshot { + user_attributes: session_atts.unwrap_or_else(|| node.user_attributes.clone()), + ..node.clone() + }; + if let Some(session_meta) = change_set.get_updated_zarr_metadata(node.id).cloned() { + if let NodeData::Array(_, manifests) = res.node_data { + Ok(NodeSnapshot { + node_data: NodeData::Array(session_meta, manifests), + ..res + }) + } else { + Ok(res) + } + } else { + Ok(res) + } +} + async fn distributed_flush>( storage: &(dyn Storage + Send + Sync), - parent_id: &SnapshotId, change_sets: I, + parent_id: &SnapshotId, message: &str, properties: SnapshotProperties, ) -> RepositoryResult { @@ -1077,93 +956,155 @@ async fn distributed_flush>( return Err(RepositoryError::NoChangesToCommit); } - // We search for the current manifest. We are assumming a single one for now - let old_snapshot = storage.fetch_snapshot(parent_id).await?; - let old_snapshot_c = Arc::clone(&old_snapshot); - let manifest_id = old_snapshot_c.iter_arc().find_map(|node| { - match node.node_data { - NodeData::Array(_, man) => { - // TODO: can we avoid clone - man.first().map(|manifest| manifest.object_id.clone()) - } - NodeData::Group => None, - } - }); + let chunks = all_chunks(storage, &change_set, parent_id) + .await? + .map_ok(|(_path, chunk_info)| chunk_info); - let old_manifest = match manifest_id { - Some(ref manifest_id) => storage.fetch_manifests(manifest_id).await?, - // If there is no previous manifest we create an empty one - None => Arc::new(Manifest::default()), - }; + let new_manifest = Arc::new(Manifest::from_stream(chunks).await?); + let new_manifest_id = ObjectId::random(); + storage.write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)).await?; - // The manifest update process is CPU intensive, so we want to executed it on a worker - // thread. Currently it's also destructive of the manifest, so we are also cloning the - // old manifest data - // - // The update process requires reference access to the set_chunks map, since we are running - // it on blocking task, it wants that reference to be 'static, which we cannot provide. - // As a solution, we temporarily `take` the map, replacing it an empty one, run the thread, - // and at the end we put the map back to where it was, in case there is some later failure. - // We always want to leave things in the previous state if there was a failure. - - let chunk_changes = Arc::new(change_set.take_chunks()); - let chunk_changes_c = Arc::clone(&chunk_changes); - - let update_task = task::spawn_blocking(move || { - //FIXME: avoid clone, this one is extremely expensive en memory - //it's currently needed because we don't want to destroy the manifest in case of later - //failure - let mut new_chunks = old_manifest.as_ref().chunks().clone(); - update_manifest(&mut new_chunks, &chunk_changes_c); - (new_chunks, chunk_changes) + let all_nodes = + updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?; + + let old_snapshot = storage.fetch_snapshot(parent_id).await?; + let mut new_snapshot = Snapshot::from_iter( + old_snapshot.as_ref(), + Some(properties), + vec![ManifestFileInfo { + id: new_manifest_id.clone(), + format_version: new_manifest.icechunk_manifest_format_version, + }], + vec![], + all_nodes, + ); + new_snapshot.metadata.message = message.to_string(); + new_snapshot.metadata.written_at = Utc::now(); + + let new_snapshot = Arc::new(new_snapshot); + let new_snapshot_id = &new_snapshot.metadata.id; + storage.write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)).await?; + + Ok(new_snapshot_id.clone()) +} + +/// Warning: The presence of a single error may mean multiple missing items +async fn updated_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &'a SnapshotId, +) -> RepositoryResult> + 'a> { + let snapshot = storage.fetch_snapshot(snapshot_id).await?; + let nodes = futures::stream::iter(snapshot.iter_arc()); + let res = nodes.then(move |node| async move { + let path = node.path.clone(); + node_chunk_iterator(storage, change_set, snapshot_id, &node.path) + .await + .map_ok(move |ci| (path.clone(), ci)) }); + Ok(res.flatten()) +} - match update_task.await { - Ok((new_chunks, chunk_changes)) => { - // reset the set_chunks map to it's previous value - #[allow(clippy::expect_used)] - { - // It's OK to call into_inner here because we created the Arc locally and never - // shared it with other code - let chunks = - Arc::into_inner(chunk_changes).expect("Bug in flush task join"); - change_set.set_chunks(chunks); - } +/// Warning: The presence of a single error may mean multiple missing items +async fn node_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &SnapshotId, + path: &Path, +) -> impl Stream> + 'a { + match get_node(storage, change_set, snapshot_id, path).await { + Ok(node) => futures::future::Either::Left( + verified_node_chunk_iterator(storage, change_set, node).await, + ), + Err(_) => futures::future::Either::Right(futures::stream::empty()), + } +} - let new_manifest = Arc::new(Manifest::new(new_chunks)); - let new_manifest_id = ObjectId::random(); - storage - .write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)) - .await?; - - let all_nodes = - updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?; - - let mut new_snapshot = Snapshot::from_iter( - old_snapshot.as_ref(), - Some(properties), - vec![ManifestFileInfo { - id: new_manifest_id.clone(), - format_version: new_manifest.icechunk_manifest_format_version, - }], - vec![], - all_nodes, +/// Warning: The presence of a single error may mean multiple missing items +async fn verified_node_chunk_iterator<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + node: NodeSnapshot, +) -> impl Stream> + 'a { + match node.node_data { + NodeData::Group => futures::future::Either::Left(futures::stream::empty()), + NodeData::Array(_, manifests) => { + let new_chunk_indices: Box> = Box::new( + change_set + .array_chunks_iterator(node.id, &node.path) + .map(|(idx, _)| idx) + .collect(), ); - new_snapshot.metadata.message = message.to_string(); - new_snapshot.metadata.written_at = Utc::now(); - - let new_snapshot = Arc::new(new_snapshot); - let new_snapshot_id = &new_snapshot.metadata.id; - storage - .write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)) - .await?; - Ok(new_snapshot_id.clone()) + let new_chunks = change_set + .array_chunks_iterator(node.id, &node.path) + .filter_map(move |(idx, payload)| { + payload.as_ref().map(|payload| { + Ok(ChunkInfo { + node: node.id, + coord: idx.clone(), + payload: payload.clone(), + }) + }) + }); + + futures::future::Either::Right( + futures::stream::iter(new_chunks).chain( + futures::stream::iter(manifests) + .then(move |manifest_ref| { + let new_chunk_indices = new_chunk_indices.clone(); + async move { + let manifest = storage + .fetch_manifests(&manifest_ref.object_id) + .await; + match manifest { + Ok(manifest) => { + let old_chunks = manifest + .iter(&node.id) + .filter(move |(coord, _)| { + !new_chunk_indices.contains(coord) + }) + .map(move |(coord, payload)| ChunkInfo { + node: node.id, + coord, + payload, + }); + + let old_chunks = change_set + .update_existing_chunks(node.id, old_chunks); + futures::future::Either::Left( + futures::stream::iter(old_chunks.map(Ok)), + ) + } + // if we cannot even fetch the manifest, we generate a + // single error value. + Err(err) => futures::future::Either::Right( + futures::stream::once(ready(Err( + RepositoryError::StorageError(err), + ))), + ), + } + } + }) + .flatten(), + ), + ) } - Err(_) => Err(RepositoryError::OtherFlushError), } } +async fn all_chunks<'a>( + storage: &'a (dyn Storage + Send + Sync), + change_set: &'a ChangeSet, + snapshot_id: &'a SnapshotId, +) -> RepositoryResult> + 'a> { + let existing_array_chunks = + updated_chunk_iterator(storage, change_set, snapshot_id).await?; + let new_array_chunks = + futures::stream::iter(change_set.new_arrays_chunk_iterator().map(Ok)); + Ok(existing_array_chunks.chain(new_array_chunks)) +} + #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { @@ -1874,6 +1815,92 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_manifests_shrink() -> Result<(), Box> { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut ds = Repository::init(Arc::clone(&storage), false).await?.build(); + ds.add_group(Path::root()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5, 5], + data_type: DataType::Float16, + chunk_shape: ChunkShape(vec![NonZeroU64::new(2).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Float16(f32::NEG_INFINITY), + codecs: vec![Codec { name: "mycodec".to_string(), configuration: None }], + storage_transformers: Some(vec![StorageTransformer { + name: "mytransformer".to_string(), + configuration: None, + }]), + dimension_names: Some(vec![Some("t".to_string())]), + }; + + let a1path: Path = "/array1".try_into()?; + let a2path: Path = "/array2".try_into()?; + + ds.add_array(a1path.clone(), zarr_meta.clone()).await?; + ds.add_array(a2path.clone(), zarr_meta.clone()).await?; + + // add 3 chunks + ds.set_chunk_ref( + a1path.clone(), + ChunkIndices(vec![0, 0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + ds.set_chunk_ref( + a1path.clone(), + ChunkIndices(vec![0, 1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + ds.set_chunk_ref( + a2path.clone(), + ChunkIndices(vec![0, 1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + + ds.commit("main", "commit", None).await?; + + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let initial_size = manifest.len(); + + ds.delete_array(a2path).await?; + ds.commit("main", "array2 deleted", None).await?; + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let size_after_delete = manifest.len(); + + assert!(size_after_delete < initial_size); + + // delete a chunk + ds.set_chunk_ref(a1path.clone(), ChunkIndices(vec![0, 0]), None).await?; + ds.commit("main", "chunk deleted", None).await?; + let manifest_id = match ds.get_array(&a1path).await?.node_data { + NodeData::Array(_, manifests) => { + manifests.first().as_ref().unwrap().object_id.clone() + } + NodeData::Group => panic!("must be an array"), + }; + let manifest = storage.fetch_manifests(&manifest_id).await?; + let size_after_chunk_delete = manifest.len(); + assert!(size_after_chunk_delete < size_after_delete); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_all_chunks_iterator() -> Result<(), Box> { let storage: Arc =