Skip to content

Commit

Permalink
Fill values as BinaryArray
Browse files Browse the repository at this point in the history
to pick 2
  • Loading branch information
dcherian committed Aug 9, 2024
1 parent 8897053 commit 9f448c8
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 6 deletions.
80 changes: 80 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ use std::{
};
use structure::StructureTable;


#[derive(Debug, Clone)]
pub enum IcechunkFormatError {
FillValueDecodeError(String),
}


#[derive(Clone, Debug, Hash, PartialEq, Eq)]
/// An ND index to an element in an array.
pub struct ArrayIndices(pub Vec<u64>);
Expand Down Expand Up @@ -172,6 +179,79 @@ pub enum FillValue {
RawBits(Vec<u8>),
}

impl FillValue {
// TODO: nicer error here
fn from_data_type_and_value(dt: DataType, value: &[u8]) -> Result<Self, IcechunkFormatError> {
use IcechunkFormatError::FillValueDecodeError;

match dt {
DataType::Int32 => value
.try_into()
.map(i32::from_be_bytes)
.map(FillValue::Int32)
.map_err(|_| FillValueDecodeError("Error decoding fill_value as int32".to_string())),
DataType::Int64 => value
.try_into()
.map(i64::from_be_bytes)
.map(FillValue::Int64)
.map_err(|_| FillValueDecodeError("Error decoding fill_value as int64".to_string())),
DataType::Float32 => value
.try_into()
.map(f32::from_be_bytes)
.map(FillValue::Float32)
.map_err(|_| FillValueDecodeError("Error decoding fill_value as float32".to_string())),
DataType::Float64 => value
.try_into()
.map(f64::from_be_bytes)
.map(FillValue::Float64)
.map_err(|_| FillValueDecodeError("Error decoding fill_value as float64".to_string())),
DataType::Complex64 => {
let r = value[..4].try_into() .map(f32::from_be_bytes);
let i = value[4..].try_into().map(f32::from_be_bytes);

// TODO: what's the better pattern here?
if i.is_err() || r.is_err() {
Err(FillValueDecodeError("Error decoding fill_value as complex64".to_string()))
} else {
Ok(FillValue::Complex64(r.unwrap(), i.unwrap()))
}
},
DataType::RawBits(_) => Ok(FillValue::RawBits(value.to_owned())),
_ => todo!(),
// DataType::Complex64 => value
// .try_into()
}
}

fn get_data_type(&self) -> DataType {
match self {
FillValue::Int32(_) => DataType::Int32,
FillValue::Int64(_) => DataType::Int64,
FillValue::Float32(_) => DataType::Float32,
FillValue::Float64(_) => DataType::Float64,
FillValue::Complex64(_, _) => DataType::Complex64,
FillValue::RawBits(v) => DataType::RawBits(v.len() as usize),
_ => todo!(),
}
}

fn to_be_bytes(&self) -> Vec<u8> {
match self {
FillValue::Int32(v) => v.to_be_bytes().into(),
FillValue::Int64(v) => v.to_be_bytes().into(),
FillValue::Float32(v) => v.to_be_bytes().into(),
FillValue::Float64(v) => v.to_be_bytes().into(),
FillValue::Complex64(r, i) => r
.to_be_bytes()
.into_iter()
.chain(i.to_be_bytes().into_iter())
.collect(),
FillValue::RawBits(v) => v.to_owned(),
_ => todo!(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Codecs(String); // FIXME: define

Expand Down
120 changes: 114 additions & 6 deletions src/structure.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{num::NonZeroU64, sync::Arc};
use std::{iter::zip, num::NonZeroU64, sync::Arc};

use arrow::{
array::{
Array, ArrayRef, AsArray, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
Array, ArrayRef, AsArray, BinaryArray, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
ListArray, ListBuilder, RecordBatch, StringArray, StringBuilder, StructArray,
UInt32Array, UInt32Builder, UInt8Array,
},
Expand All @@ -14,7 +14,7 @@ use crate::{
ChunkKeyEncoding, ChunkShape, Codecs, DataType, DimensionName, FillValue, Flags,
ManifestExtents, ManifestRef, NodeData, NodeId, NodeStructure, NodeType, ObjectId,
Path, StorageTransformers, TableRegion, UserAttributes, UserAttributesRef,
UserAttributesStructure, ZarrArrayMetadata,
UserAttributesStructure, ZarrArrayMetadata, IcechunkFormatError,
};

pub struct StructureTable {
Expand Down Expand Up @@ -101,12 +101,23 @@ impl StructureTable {
)
};

let encoded_fill_value = self.batch.column_by_name("fill_value")?
.as_any()
.downcast_ref::<BinaryArray>()
.map(|x| x.value(idx))
;
let fill_value = encoded_fill_value
// TODO: remove clone
.map(|x| FillValue::from_data_type_and_value(data_type.clone(), x)
).unwrap().unwrap();


Some(ZarrArrayMetadata {
shape,
data_type,
chunk_shape,
chunk_key_encoding,
fill_value: FillValue::Int32(0), // FIXME: implement
fill_value,
codecs,
storage_transformers,
dimension_names,
Expand Down Expand Up @@ -285,6 +296,33 @@ where
UInt8Array::from_iter(iter)
}

fn mk_fill_values_array(fill_values: Vec<Option<FillValue>>) -> BinaryArray {
//TODO: Is there a possibility for errors here?
let as_bytes: Vec<Option<Vec<u8>>> = fill_values
.iter()
.map(|fv| fv.as_ref().and_then(|f| Some(f.to_be_bytes())))
.collect();

let as_slice: Vec<Option<&[u8]>> = as_bytes
.iter()
// TODO: this as_ref really confused me!
.map(|v| v.as_ref().map(|f| f.as_slice()))
.collect();
BinaryArray::from_opt_vec(as_slice)
}

fn decode_fill_values_array(dtypes: Vec<Option<DataType>>, array: BinaryArray) -> Result<Vec<Option<FillValue>>, IcechunkFormatError> {
fn create_fill_value(dtype: Option<DataType>, value: Option<&[u8]>) -> Option<Result<FillValue, IcechunkFormatError>>{
dtype.map(|dt| FillValue::from_data_type_and_value(dt, value.unwrap()))
}

zip(dtypes, array.iter())
.map(|(dt, value)| create_fill_value(dt, value))
.map(|x| x.transpose())
.into_iter()
.collect()
}

fn mk_codecs_array<T: IntoIterator<Item = Option<Codecs>>>(coll: T) -> StringArray {
let iter = coll.into_iter().map(|x| x.map(|x| x.0));
StringArray::from_iter(iter)
Expand Down Expand Up @@ -425,6 +463,7 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
let mut data_types = Vec::new();
let mut chunk_shapes = Vec::new();
let mut chunk_key_encodings = Vec::new();
let mut fill_values = Vec::new();
let mut codecs = Vec::new();
let mut storage_transformers = Vec::new();
let mut dimension_names = Vec::new();
Expand Down Expand Up @@ -465,6 +504,7 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
data_types.push(None);
chunk_shapes.push(None);
chunk_key_encodings.push(None);
fill_values.push(None);
codecs.push(None);
storage_transformers.push(None);
dimension_names.push(None);
Expand All @@ -476,6 +516,7 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
data_types.push(Some(zarr_metadata.data_type));
chunk_shapes.push(Some(zarr_metadata.chunk_shape));
chunk_key_encodings.push(Some(zarr_metadata.chunk_key_encoding));
fill_values.push(Some(zarr_metadata.fill_value));
codecs.push(Some(zarr_metadata.codecs));
storage_transformers.push(zarr_metadata.storage_transformers);
dimension_names.push(zarr_metadata.dimension_names);
Expand All @@ -491,6 +532,7 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
let data_types = mk_data_type_array(data_types);
let chunk_shapes = mk_chunk_shape_array(chunk_shapes);
let chunk_key_encodings = mk_chunk_key_encoding_array(chunk_key_encodings);
let fill_values = mk_fill_values_array(fill_values);
let codecs = mk_codecs_array(codecs);
let storage_transformers = mk_storage_transformers_array(storage_transformers);
let dimension_names = mk_dimension_names_array(dimension_names);
Expand All @@ -507,6 +549,7 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
Arc::new(data_types),
Arc::new(chunk_shapes),
Arc::new(chunk_key_encodings),
Arc::new(fill_values),
Arc::new(codecs),
Arc::new(storage_transformers),
Arc::new(dimension_names),
Expand All @@ -531,8 +574,11 @@ pub fn mk_structure_table<T: IntoIterator<Item = NodeStructure>>(
true,
),
Field::new("chunk_key_encoding", arrow::datatypes::DataType::UInt8, true),
// FIXME:
//Field::new("fill_value", todo!(), true),
Field::new(
"fill_value",
arrow::datatypes::DataType::Binary,
true,
),
Field::new("codecs", arrow::datatypes::DataType::Utf8, true),
Field::new("storage_transformers", arrow::datatypes::DataType::Utf8, true),
Field::new_list(
Expand Down Expand Up @@ -608,7 +654,9 @@ mod tests {
};
let zarr_meta2 = ZarrArrayMetadata {
storage_transformers: None,
data_type: DataType::Int32,
dimension_names: Some(vec![None, None, Some("t".to_string())]),
fill_value: FillValue::Int32(0i32),
..zarr_meta1.clone()
};
let zarr_meta3 =
Expand Down Expand Up @@ -740,4 +788,64 @@ mod tests {
}),
);
}
#[test]
fn test_fill_values_vec_roundtrip() {
let fill_values = vec![
None, // for groups
Some(FillValue::Int32(0i32)),
Some(FillValue::Int64(0i64)),
Some(FillValue::Float32(0f32)),
Some(FillValue::Float64(0f64)),
Some(FillValue::Complex64(0f32, 1f32)),
Some(FillValue::RawBits(vec![b'1'])),
];


let dtypes: Vec<Option<DataType>> = fill_values
.iter()
.map(|x| x.as_ref().map(|x| x.get_data_type()))
.collect();
let encoded = mk_fill_values_array(fill_values.clone());
let decoded = decode_fill_values_array(dtypes, encoded).unwrap();

assert_eq!(fill_values, decoded);
}

// #[test]
// fn test_fill_value_decode() {
// // int32
// let value = 1i32;
// let expected = FillValue::Int32(value);
// let encoded = Int32Array::from(vec![value]);
// let actual = to_fill_value("int32", Arc::new(encoded)).unwrap();
// assert_eq!(expected, actual);

// // int64
// let value = 1i64;
// let expected = FillValue::Int64(value);
// let encoded = Int64Array::from(vec![value]);
// let actual = to_fill_value("int64", Arc::new(encoded)).unwrap();
// assert_eq!(expected, actual);

// // float64
// let value = 1f64;
// let expected = FillValue::Float64(value);
// let encoded = Float64Array::from(vec![value]);
// let actual = to_fill_value("float64", Arc::new(encoded)).unwrap();
// assert_eq!(expected, actual);

// // // complex64
// let value = vec![1.0f32, 2.0f32];
// let expected = FillValue::Complex64(value[0], value[1]);
// let encoded = fixed_size_list_array_from_vec(value);
// let actual = to_fill_value("complex64", Arc::new(encoded)).unwrap();
// assert_eq!(expected, actual);

// // // binary
// let value = b"123";
// let expected = FillValue::RawBits(value.to_vec());
// let encoded = BinaryArray::from_vec(vec![value]);
// let actual = to_fill_value("raw", Arc::new(encoded)).unwrap();
// assert_eq!(expected, actual);
// }
}

0 comments on commit 9f448c8

Please sign in to comment.