diff --git a/Cargo.lock b/Cargo.lock index d1f62dac..915023f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,7 +13,7 @@ dependencies = [ "getrandom", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -298,6 +298,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.1" @@ -461,6 +467,7 @@ dependencies = [ "arrow", "async-trait", "pretty_assertions", + "rand", ] [[package]] @@ -662,6 +669,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ppv-lite86" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" +dependencies = [ + "zerocopy 0.6.6", +] + [[package]] name = "pretty_assertions" version = "1.4.0" @@ -690,6 +706,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "regex" version = "1.10.5" @@ -949,13 +995,34 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "zerocopy" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" +dependencies = [ + "byteorder", + "zerocopy-derive 0.6.6", +] + [[package]] name = "zerocopy" version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy-derive" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 04d0d388..da0e7be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] arrow = "52.2.0" async-trait = "0.1.81" +rand = "0.8.5" [profile.release-with-debug] inherits = "release" diff --git a/src/dataset.rs b/src/dataset.rs index bd18c910..c6f58022 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use crate::{ - AddNodeError, ArrayIndices, ArrayStructure, AttributesTable, ChunkPayload, Dataset, - GroupStructure, ManifestsTable, NodeStructure, Path, StructureTable, UpdateNodeError, - UserAttributes, ZarrArrayMetadata, + AddNodeError, ArrayIndices, AttributesTable, ChunkPayload, Dataset, ManifestsTable, NodeData, + NodeStructure, Path, StructureTable, UpdateNodeError, UserAttributes, ZarrArrayMetadata, }; /// FIXME: what do we want to do with implicit groups? @@ -47,11 +46,14 @@ impl Dataset { ) -> Result<(), UpdateNodeError> { match self.get_node(&path).await { None => Err(UpdateNodeError::NotFound), - Some(NodeStructure::Group(..)) => Err(UpdateNodeError::NotAnArray), - Some(NodeStructure::Array(..)) => { + Some(NodeStructure { + node_data: NodeData::Array(..), + .. + }) => { self.updated_arrays.insert(path, metadata); Ok(()) } + Some(_) => Err(UpdateNodeError::NotAnArray), } } @@ -81,11 +83,14 @@ impl Dataset { ) -> Result<(), UpdateNodeError> { match self.get_node(&path).await { None => Err(UpdateNodeError::NotFound), - Some(NodeStructure::Group(..)) => Err(UpdateNodeError::NotAnArray), - Some(NodeStructure::Array(..)) => { + Some(NodeStructure { + node_data: NodeData::Array(..), + .. + }) => { self.set_chunks.insert((path, coord), data); Ok(()) } + Some(_) => Err(UpdateNodeError::NotAnArray), } } @@ -101,10 +106,10 @@ impl Dataset { structure.get_node(path) } - pub async fn get_user_attributes(&self, path: Path) -> Option { + pub async fn get_user_attributes(&self, _path: Path) -> Option { todo!() } - pub async fn get_chunk(&self, path: Path, coord: ArrayIndices) -> ChunkPayload { + pub async fn get_chunk(&self, _path: Path, _coord: ArrayIndices) -> ChunkPayload { todo!() } diff --git a/src/lib.rs b/src/lib.rs index b9d68bbf..67bf685e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,12 +35,6 @@ use std::{ }; use structure::StructureTable; -#[derive(Clone, Debug)] -pub enum NodeType { - Group, - Array, -} - /// An ND index to an element in an array. pub type ArrayIndices = Vec; @@ -194,30 +188,56 @@ pub type NodeId = u32; /// The id of a file in object store /// FIXME: should this be passed by ref everywhere? -pub type ObjectId = [u8; 16]; // FIXME: this doesn't need to be this big +#[derive(Debug, Hash, Clone, PartialEq, Eq)] +pub struct ObjectId([u8; 16]); // FIXME: this doesn't need to be this big + +impl ObjectId { + const SIZE: usize = 16; + + pub fn random() -> ObjectId { + ObjectId(rand::random()) + } +} + +impl TryFrom<&[u8]> for ObjectId { + type Error = &'static str; + + fn try_from(value: &[u8]) -> Result { + let buf = value.try_into(); + buf.map(ObjectId) + .map_err(|_| "Invalid ObjectId buffer length") + } +} pub type ChunkOffset = u64; pub type ChunkLength = u64; -type TableOffset = usize; -type TableLength = usize; +type TableOffset = u32; +type TableLength = u32; + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct TableRegion(TableOffset, TableLength); +#[derive(Debug, Clone, PartialEq, Eq)] struct Flags(); // FIXME: implement +#[derive(Debug, Clone, PartialEq, Eq)] struct UserAttributesRef { object_id: ObjectId, location: TableOffset, flags: Flags, } +#[derive(Debug, Clone, PartialEq, Eq)] enum UserAttributesStructure { Inline(UserAttributes), Ref(UserAttributesRef), } +#[derive(Debug, Clone, PartialEq)] struct ManifestExtents(Vec); +#[derive(Debug, Clone, PartialEq)] struct ManifestRef { object_id: ObjectId, location: TableRegion, @@ -234,29 +254,28 @@ pub struct ZarrArrayMetadata { fill_value: FillValue, codecs: Codecs, storage_transformers: Option, + // each dimension name can be null in Zarr dimension_names: Option>>, } -#[derive(Debug, PartialEq)] -pub struct ArrayStructure { - id: NodeId, - path: Path, - zarr_metadata: ZarrArrayMetadata, - //user_attributes: UserAttributesStructure, - //manifests: Vec, +#[derive(Clone, Debug)] +pub enum NodeType { + Group, + Array, } -#[derive(Debug, PartialEq, Eq)] -pub struct GroupStructure { - id: NodeId, - path: Path, - //user_attributes: UserAttributesStructure, +#[derive(Debug, PartialEq)] +pub enum NodeData { + Array(ZarrArrayMetadata), //(manifests: Vec) + Group, } #[derive(Debug, PartialEq)] -pub enum NodeStructure { - Array(ArrayStructure), - Group(GroupStructure), +pub struct NodeStructure { + id: NodeId, + path: Path, + user_attributes: Option, + node_data: NodeData, } pub struct VirtualChunkRef { diff --git a/src/structure.rs b/src/structure.rs index 0d362f6f..9da05659 100644 --- a/src/structure.rs +++ b/src/structure.rs @@ -2,15 +2,16 @@ use std::{num::NonZeroU64, sync::Arc}; use arrow::{ array::{ - Array, AsArray, ListArray, ListBuilder, RecordBatch, StringArray, StringBuilder, - UInt32Array, UInt8Array, + Array, AsArray, FixedSizeBinaryArray, ListArray, ListBuilder, RecordBatch, StringArray, + StringBuilder, UInt32Array, UInt8Array, }, datatypes::{Field, Schema, UInt32Type, UInt64Type, UInt8Type}, }; use crate::{ - ArrayStructure, ChunkKeyEncoding, ChunkShape, Codecs, DataType, DimensionName, FillValue, - GroupStructure, NodeId, NodeStructure, NodeType, Path, StorageTransformers, ZarrArrayMetadata, + ChunkKeyEncoding, ChunkShape, Codecs, DataType, DimensionName, FillValue, Flags, NodeData, + NodeId, NodeStructure, NodeType, ObjectId, Path, StorageTransformers, UserAttributes, + UserAttributesRef, UserAttributesStructure, ZarrArrayMetadata, }; pub struct StructureTable { @@ -123,23 +124,61 @@ impl StructureTable { .column_by_name("id")? .as_primitive_opt::()? .value(idx); + let user_attributes = self.build_user_attributes(idx); match node_type { - "group" => Some(NodeStructure::Group(GroupStructure { + "group" => Some(NodeStructure { path: path.clone(), id, - })), - "array" => { - let zarr_metadata = self.build_zarr_array_metadata(idx)?; - let array = ArrayStructure { - path: path.clone(), - id, - zarr_metadata, - }; - Some(NodeStructure::Array(array)) - } + user_attributes, + node_data: NodeData::Group, + }), + "array" => Some(NodeStructure { + path: path.clone(), + id, + user_attributes, + node_data: NodeData::Array(self.build_zarr_array_metadata(idx)?), + }), _ => None, } } + + fn build_user_attributes(&self, idx: usize) -> Option { + let inline = self + .batch + .column_by_name("user_attributes")? + .as_string_opt::()?; + if inline.is_valid(idx) { + Some(UserAttributesStructure::Inline( + inline.value(idx).to_string(), + )) + } else { + self.build_user_attributes_ref(idx) + } + } + fn build_user_attributes_ref(&self, idx: usize) -> Option { + let atts_ref = self + .batch + .column_by_name("user_attributes_ref")? + .as_fixed_size_binary_opt()?; + let atts_row = self + .batch + .column_by_name("user_attributes_row")? + .as_primitive_opt::()?; + if atts_ref.is_valid(idx) && atts_row.is_valid(idx) { + let object_id = atts_ref.value(idx); + let object_id = object_id.try_into().ok()?; + let location = atts_row.value(idx); + // FIXME: flags + let flags = Flags(); + Some(UserAttributesStructure::Ref(UserAttributesRef { + object_id, + location, + flags, + })) + } else { + None + } + } } fn mk_id_array(coll: T) -> UInt32Array @@ -243,6 +282,26 @@ where b.finish() } +fn mk_user_attributes_array>>( + coll: T, +) -> StringArray { + let iter = coll.into_iter(); + StringArray::from_iter(iter) +} + +fn mk_user_attributes_ref_array>>( + coll: T, +) -> FixedSizeBinaryArray { + let iter = coll.into_iter().map(|oid| oid.map(|oid| oid.0)); + FixedSizeBinaryArray::try_from_sparse_iter_with_size(iter, ObjectId::SIZE as i32) + .expect("Bad ObjectId size") +} + +fn mk_user_attributes_row_array>>(coll: T) -> UInt32Array { + let iter = coll.into_iter(); + UInt32Array::from_iter(iter) +} + // For testing only pub fn mk_structure_table>(coll: T) -> StructureTable { let mut ids = Vec::new(); @@ -255,12 +314,38 @@ pub fn mk_structure_table>(coll: T) -> Str let mut codecs = Vec::new(); let mut storage_transformers = Vec::new(); let mut dimension_names = Vec::new(); + let mut user_attributes_vec = Vec::new(); + let mut user_attributes_ref = Vec::new(); + let mut user_attributes_row = Vec::new(); + // FIXME: add user_attributes_flags for node in coll { - match node { - NodeStructure::Group(GroupStructure { id, path }) => { + ids.push(node.id); + paths.push(node.path.to_string_lossy().into_owned()); + match node.user_attributes { + Some(UserAttributesStructure::Inline(atts)) => { + user_attributes_ref.push(None); + user_attributes_row.push(None); + user_attributes_vec.push(Some(atts)); + } + Some(UserAttributesStructure::Ref(UserAttributesRef { + object_id, + location, + flags: _flags, + })) => { + user_attributes_vec.push(None); + user_attributes_ref.push(Some(object_id)); + user_attributes_row.push(Some(location)); + } + None => { + user_attributes_vec.push(None); + user_attributes_ref.push(None); + user_attributes_row.push(None); + } + } + + match node.node_data { + NodeData::Group => { types.push(NodeType::Group); - ids.push(id); - paths.push(path.to_string_lossy().into_owned()); shapes.push(None); data_types.push(None); chunk_shapes.push(None); @@ -269,14 +354,8 @@ pub fn mk_structure_table>(coll: T) -> Str storage_transformers.push(None); dimension_names.push(None); } - NodeStructure::Array(ArrayStructure { - id, - path, - zarr_metadata, - }) => { + NodeData::Array(zarr_metadata) => { types.push(NodeType::Array); - ids.push(id); - paths.push(path.to_string_lossy().into_owned()); shapes.push(Some(zarr_metadata.shape)); data_types.push(Some(zarr_metadata.data_type)); chunk_shapes.push(Some(zarr_metadata.chunk_shape)); @@ -287,6 +366,7 @@ pub fn mk_structure_table>(coll: T) -> Str } } } + let ids = mk_id_array(ids); let types = mk_type_array(types); let paths = mk_path_array(paths); @@ -297,6 +377,10 @@ pub fn mk_structure_table>(coll: T) -> Str let codecs = mk_codecs_array(codecs); let storage_transformers = mk_storage_transformers_array(storage_transformers); let dimension_names = mk_dimension_names_array(dimension_names); + let user_attributes_vec = mk_user_attributes_array(user_attributes_vec); + let user_attributes_ref = mk_user_attributes_ref_array(user_attributes_ref); + let user_attributes_row = mk_user_attributes_row_array(user_attributes_row); + let columns: Vec> = vec![ Arc::new(ids), Arc::new(types), @@ -308,6 +392,9 @@ pub fn mk_structure_table>(coll: T) -> Str Arc::new(codecs), Arc::new(storage_transformers), Arc::new(dimension_names), + Arc::new(user_attributes_vec), + Arc::new(user_attributes_ref), + Arc::new(user_attributes_row), ]; let schema = Arc::new(Schema::new(vec![ Field::new("id", arrow::datatypes::DataType::UInt32, false), @@ -342,6 +429,17 @@ pub fn mk_structure_table>(coll: T) -> Str Field::new("item", arrow::datatypes::DataType::Utf8, true), true, ), + Field::new("user_attributes", arrow::datatypes::DataType::Utf8, true), + Field::new( + "user_attributes_ref", + arrow::datatypes::DataType::FixedSizeBinary(ObjectId::SIZE as i32), + true, + ), + Field::new( + "user_attributes_row", + arrow::datatypes::DataType::UInt32, + true, + ), ])); let batch = RecordBatch::try_new(schema, columns).expect("Error creating record batch"); StructureTable { batch } @@ -383,38 +481,54 @@ mod tests { ..zarr_meta2.clone() }; + let oid = ObjectId::random(); let nodes = vec![ - NodeStructure::Group(GroupStructure { + NodeStructure { path: "/".into(), id: 1, - }), - NodeStructure::Group(GroupStructure { + user_attributes: None, + node_data: NodeData::Group, + }, + NodeStructure { path: "/a".into(), id: 2, - }), - NodeStructure::Group(GroupStructure { + user_attributes: None, + node_data: NodeData::Group, + }, + NodeStructure { path: "/b".into(), id: 3, - }), - NodeStructure::Group(GroupStructure { + user_attributes: None, + node_data: NodeData::Group, + }, + NodeStructure { path: "/b/c".into(), id: 4, - }), - NodeStructure::Array(ArrayStructure { + user_attributes: Some(UserAttributesStructure::Inline("some inline".to_string())), + node_data: NodeData::Group, + }, + NodeStructure { path: "/b/array1".into(), id: 5, - zarr_metadata: zarr_meta1.clone(), - }), - NodeStructure::Array(ArrayStructure { + user_attributes: Some(UserAttributesStructure::Ref(UserAttributesRef { + object_id: oid.clone(), + location: 42, + flags: Flags(), + })), + node_data: NodeData::Array(zarr_meta1.clone()), + }, + NodeStructure { path: "/array2".into(), - id: 5, - zarr_metadata: zarr_meta2.clone(), - }), - NodeStructure::Array(ArrayStructure { + id: 6, + user_attributes: None, + node_data: NodeData::Array(zarr_meta2.clone()), + }, + NodeStructure { path: "/b/array3".into(), - id: 5, - zarr_metadata: zarr_meta3.clone(), - }), + id: 7, + user_attributes: None, + node_data: NodeData::Array(zarr_meta3.clone()), + }, ]; let st = mk_structure_table(nodes); assert_eq!(st.get_node(&"/nonexistent".into()), None); @@ -422,45 +536,56 @@ mod tests { let node = st.get_node(&"/b/c".into()); assert_eq!( node, - Some(NodeStructure::Group(GroupStructure { + Some(NodeStructure { path: "/b/c".into(), id: 4, - })) + user_attributes: Some(UserAttributesStructure::Inline("some inline".to_string())), + node_data: NodeData::Group, + }), ); let node = st.get_node(&"/".into()); assert_eq!( node, - Some(NodeStructure::Group(GroupStructure { + Some(NodeStructure { path: "/".into(), id: 1, - })) + user_attributes: None, + node_data: NodeData::Group, + }), ); let node = st.get_node(&"/b/array1".into()); assert_eq!( node, - Some(NodeStructure::Array(ArrayStructure { + Some(NodeStructure { path: "/b/array1".into(), id: 5, - zarr_metadata: zarr_meta1, - }),) + user_attributes: Some(UserAttributesStructure::Ref(UserAttributesRef { + object_id: oid, + location: 42, + flags: Flags(), + })), + node_data: NodeData::Array(zarr_meta1.clone()), + }), ); let node = st.get_node(&"/array2".into()); assert_eq!( node, - Some(NodeStructure::Array(ArrayStructure { + Some(NodeStructure { path: "/array2".into(), - id: 5, - zarr_metadata: zarr_meta2, - }),) + id: 6, + user_attributes: None, + node_data: NodeData::Array(zarr_meta2.clone()), + }), ); let node = st.get_node(&"/b/array3".into()); assert_eq!( node, - Some(NodeStructure::Array(ArrayStructure { + Some(NodeStructure { path: "/b/array3".into(), - id: 5, - zarr_metadata: zarr_meta3, - }),) + id: 7, + user_attributes: None, + node_data: NodeData::Array(zarr_meta3.clone()), + }), ); } }