Skip to content

Commit

Permalink
Allow overwriting existing groups and arrays (#433)
Browse files Browse the repository at this point in the history
Closes #347

This bug manifests when groups or arrays were committed, and then are
being overwritten. Zarr will delete the existing nodes first and then
add them. This means the same node is in both the `new_*` and
`deleted_*` hashmaps, and so getting the node fails (it prioritizes
`deleted_groups` and returns KeyError)
  • Loading branch information
dcherian authored Dec 3, 2024
1 parent aa0edc2 commit dc1e17f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
10 changes: 10 additions & 0 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl ChangeSet {
}

pub fn add_group(&mut self, path: Path, node_id: NodeId) {
// when overwriting a group that exists in the base snapshot,
// zarr will delete the group first, and then add a new group
// get_group prioritizes `deleted_groups` over `new_groups`,
// so we must remove from `deleted_groups` here
self.deleted_groups.remove(&path);
self.new_groups.insert(path, node_id);
}

Expand Down Expand Up @@ -121,6 +126,11 @@ impl ChangeSet {
node_id: NodeId,
metadata: ZarrArrayMetadata,
) {
// when overwriting a array that exists in the base snapshot,
// zarr will delete the array first, and then add a new group
// get_group prioritizes `deleted_arrays` over `new_arrays`,
// so we must remove from `deleted_arrays` here
self.deleted_arrays.remove(&path);
self.new_arrays.insert(path, (node_id, metadata));
}

Expand Down
48 changes: 48 additions & 0 deletions icechunk/src/zarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2483,6 +2483,54 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_overwrite() -> Result<(), Box<dyn std::error::Error>> {
// GH347
let storage: Arc<dyn Storage + Send + Sync> =
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into())));
let store = Store::new_from_storage(Arc::clone(&storage)).await?;

let meta1 = Bytes::copy_from_slice(
br#"{"zarr_format":3,"node_type":"group","attributes":{"foo":42}}"#,
);
let meta2 = Bytes::copy_from_slice(
br#"{"zarr_format":3,"node_type":"group","attributes":{"foo":84}}"#,
);
let zarr_meta1 = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
let zarr_meta2 = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":84},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);

// with no commit in the middle, this tests the changeset
store.set("zarr.json", meta1.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta1.clone()).await.unwrap();
store.delete("zarr.json").await.unwrap();
store.delete("array/zarr.json").await.unwrap();
store.set("zarr.json", meta2.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta2.clone()).await.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
assert_eq!(
&store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(),
&zarr_meta2
);

// with a commit in the middle, this tests the changeset interaction with snapshot
store.set("zarr.json", meta1).await.unwrap();
store.set("array/zarr.json", zarr_meta1.clone()).await.unwrap();
store.commit("initial commit").await.unwrap();
store.delete("zarr.json").await.unwrap();
store.delete("array/zarr.json").await.unwrap();
store.set("zarr.json", meta2.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta2.clone()).await.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
store.commit("commit 2").await.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
assert_eq!(
&store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(),
&zarr_meta2
);

Ok(())
}

#[tokio::test]
async fn test_branch_reset() -> Result<(), Box<dyn std::error::Error>> {
let storage: Arc<dyn Storage + Send + Sync> =
Expand Down

0 comments on commit dc1e17f

Please sign in to comment.