Skip to content

Commit

Permalink
[FEAT] Fixed Size Binary Type v2 (#2403)
Browse files Browse the repository at this point in the history
Supersedes #2266. Additionally implements `binary` to
`fixed_size_binary` cast if all values have the correct length.

Original description:

> Implements a fixed size binary type and array.
>
> Todo:
>
> * Optimize the kernels to leverage fixed size lengths

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
Vince7778 and colin-ho authored Jun 19, 2024
1 parent 4b67b85 commit a8876f0
Show file tree
Hide file tree
Showing 49 changed files with 1,053 additions and 102 deletions.
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ class PyDataType:
@staticmethod
def binary() -> PyDataType: ...
@staticmethod
def fixed_size_binary(size: int) -> PyDataType: ...
@staticmethod
def string() -> PyDataType: ...
@staticmethod
def decimal128(precision: int, size: int) -> PyDataType: ...
Expand Down
15 changes: 10 additions & 5 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ def binary(cls) -> DataType:
"""Create a Binary DataType: A string of bytes"""
return cls._from_pydatatype(PyDataType.binary())

@classmethod
def fixed_size_binary(cls, size: int) -> DataType:
"""Create a FixedSizeBinary DataType: A fixed-size string of bytes"""
if not isinstance(size, int) or size <= 0:
raise ValueError("The size for a fixed-size binary must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.fixed_size_binary(size))

@classmethod
def null(cls) -> DataType:
"""Creates the Null DataType: Always the ``Null`` value"""
Expand Down Expand Up @@ -364,12 +371,10 @@ def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
return cls.float64()
elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):
return cls.string()
elif (
pa.types.is_binary(arrow_type)
or pa.types.is_large_binary(arrow_type)
or pa.types.is_fixed_size_binary(arrow_type)
):
elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):
return cls.binary()
elif pa.types.is_fixed_size_binary(arrow_type):
return cls.fixed_size_binary(arrow_type.byte_width)
elif pa.types.is_boolean(arrow_type):
return cls.bool()
elif pa.types.is_null(arrow_type):
Expand Down
3 changes: 2 additions & 1 deletion src/arrow2/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl FixedSizeBinaryArray {
.map_or(false, |validity| validity.len() != len)
{
return Err(Error::oos(
"validity mask length must be equal to the number of values divided by size",
format!("validity mask length (got {}) must be equal to the number of values ({}) divided by size ({})",
validity.unwrap().len(), values.len(), size),
));
}

Expand Down
62 changes: 61 additions & 1 deletion src/arrow2/src/compute/cast/binary_to.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::Result;
use crate::error::{Error, Result};
use crate::offset::{Offset, Offsets};
use crate::{array::*, datatypes::DataType, types::NativeType};

Expand Down Expand Up @@ -155,6 +155,66 @@ pub fn fixed_size_binary_binary<O: Offset>(
)
}

pub fn binary_to_fixed_size_binary<O: Offset>(
from: &BinaryArray<O>,
size: usize,
) -> Result<Box<dyn Array>> {
if let Some(validity) = from.validity() {
// Ensure all valid elements have the right size
for (value, valid) in from.values_iter().zip(validity) {
if valid && value.len() != size {
return Err(Error::InvalidArgumentError(
format!(
"element has invalid length ({}, expected {})",
value.len(),
size
)
.to_string(),
));
}
}

// Copy values to new buffer, accounting for validity
let mut values: Vec<u8> = Vec::new();
let offsets = from.offsets().buffer().iter();
let from_values = from.values();
for (off, valid) in offsets.zip(validity) {
if valid {
let start = off.to_usize();
let end = start + size;
values.extend(&from_values[start..end]);
} else {
values.extend(std::iter::repeat(0u8).take(size));
}
}
Ok(Box::new(FixedSizeBinaryArray::try_new(
DataType::FixedSizeBinary(size),
values.into(),
from.validity().cloned(),
)?))
} else {
// Ensure all elements have the right size
for value in from.values_iter() {
if value.len() != size {
return Err(Error::InvalidArgumentError(
format!(
"element has invalid length ({}, expected {})",
value.len(),
size
)
.to_string(),
));
}
}

Ok(Box::new(FixedSizeBinaryArray::try_new(
DataType::FixedSizeBinary(size),
from.values().clone(),
from.validity().cloned(),
)?))
}
}

/// Conversion of binary
pub fn binary_to_list<O: Offset>(from: &BinaryArray<O>, to_data_type: DataType) -> ListArray<O> {
let values = from.values().clone();
Expand Down
7 changes: 5 additions & 2 deletions src/arrow2/src/compute/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
}

(Binary, to_type) => {
is_numeric(to_type) || matches!(to_type, LargeBinary | Utf8 | LargeUtf8)
is_numeric(to_type)
|| matches!(to_type, LargeBinary | FixedSizeBinary(_) | Utf8 | LargeUtf8)
}
(LargeBinary, to_type) => {
is_numeric(to_type)
|| match to_type {
Binary | LargeUtf8 => true,
Binary | FixedSizeBinary(_) | LargeUtf8 => true,
LargeList(field) => matches!(field.data_type, UInt8),
_ => false,
}
Expand Down Expand Up @@ -772,6 +773,7 @@ pub fn cast(array: &dyn Array, to_type: &DataType, options: CastOptions) -> Resu
Int64 => binary_to_primitive_dyn::<i32, i64>(array, to_type, options),
Float32 => binary_to_primitive_dyn::<i32, f32>(array, to_type, options),
Float64 => binary_to_primitive_dyn::<i32, f64>(array, to_type, options),
FixedSizeBinary(size) => binary_to_fixed_size_binary::<i32>(array.as_any().downcast_ref().unwrap(), *size),
LargeBinary => Ok(Box::new(binary_to_large_binary(
array.as_any().downcast_ref().unwrap(),
to_type.clone(),
Expand Down Expand Up @@ -800,6 +802,7 @@ pub fn cast(array: &dyn Array, to_type: &DataType, options: CastOptions) -> Resu
binary_large_to_binary(array.as_any().downcast_ref().unwrap(), to_type.clone())
.map(|x| x.boxed())
}
FixedSizeBinary(size) => binary_to_fixed_size_binary::<i64>(array.as_any().downcast_ref().unwrap(), *size),
LargeUtf8 => {
binary_to_utf8::<i64>(array.as_any().downcast_ref().unwrap(), to_type.clone())
.map(|x| x.boxed())
Expand Down
15 changes: 13 additions & 2 deletions src/daft-core/src/array/from.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use crate::datatypes::{
BinaryArray, BooleanArray, DaftNumericType, DaftPhysicalType, DataType, Field, NullArray,
Utf8Array, Utf8Type,
BinaryArray, BooleanArray, DaftNumericType, DaftPhysicalType, DataType, Field,
FixedSizeBinaryArray, NullArray, Utf8Array, Utf8Type,
};

use crate::array::DataArray;
Expand Down Expand Up @@ -38,6 +38,17 @@ impl From<(&str, Box<arrow2::array::BinaryArray<i64>>)> for BinaryArray {
}
}

impl From<(&str, Box<arrow2::array::FixedSizeBinaryArray>)> for FixedSizeBinaryArray {
fn from(item: (&str, Box<arrow2::array::FixedSizeBinaryArray>)) -> Self {
let (name, array) = item;
DataArray::new(
Field::new(name, DataType::FixedSizeBinary(array.size())).into(),
array,
)
.unwrap()
}
}

impl<T> From<(&str, &[T::Native])> for DataArray<T>
where
T: DaftNumericType,
Expand Down
19 changes: 18 additions & 1 deletion src/daft-core/src/array/from_iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::datatypes::{BinaryArray, BooleanArray, DaftNumericType, Field, Utf8Array};
use crate::datatypes::{
BinaryArray, BooleanArray, DaftNumericType, Field, FixedSizeBinaryArray, Utf8Array,
};

use super::DataArray;

Expand Down Expand Up @@ -42,6 +44,21 @@ impl BinaryArray {
}
}

impl FixedSizeBinaryArray {
pub fn from_iter<S: AsRef<[u8]>>(
name: &str,
iter: impl arrow2::trusted_len::TrustedLen<Item = Option<S>>,
size: usize,
) -> Self {
let arrow_array = Box::new(arrow2::array::FixedSizeBinaryArray::from_iter(iter, size));
DataArray::new(
Field::new(name, crate::DataType::FixedSizeBinary(size)).into(),
arrow_array,
)
.unwrap()
}
}

impl BooleanArray {
pub fn from_iter(
name: &str,
Expand Down
9 changes: 7 additions & 2 deletions src/daft-core/src/array/growable/arrow_growable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
},
datatypes::{
BinaryType, BooleanType, DaftArrowBackedType, DaftDataType, ExtensionArray, Field,
Float32Type, Float64Type, Int128Type, Int16Type, Int32Type, Int64Type, Int8Type, NullType,
UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
FixedSizeBinaryType, Float32Type, Float64Type, Int128Type, Int16Type, Int32Type, Int64Type,
Int8Type, NullType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
},
DataType, IntoSeries, Series,
};
Expand Down Expand Up @@ -159,6 +159,11 @@ impl_arrow_backed_data_array_growable!(
BinaryType,
arrow2::array::growable::GrowableBinary<'a, i64>
);
impl_arrow_backed_data_array_growable!(
ArrowFixedSizeBinaryGrowable,
FixedSizeBinaryType,
arrow2::array::growable::GrowableFixedSizeBinary<'a>
);
impl_arrow_backed_data_array_growable!(
ArrowUtf8Growable,
Utf8Type,
Expand Down
10 changes: 7 additions & 3 deletions src/daft-core/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
},
BinaryArray, BooleanArray, ExtensionArray, Float32Array, Float64Array, Int128Array,
Int16Array, Int32Array, Int64Array, Int8Array, NullArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array, Utf8Array,
BinaryArray, BooleanArray, ExtensionArray, FixedSizeBinaryArray, Float32Array,
Float64Array, Int128Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array,
},
with_match_daft_types, DataType, Series,
};
Expand Down Expand Up @@ -179,6 +179,10 @@ impl_growable_array!(UInt64Array, arrow_growable::ArrowUInt64Growable<'a>);
impl_growable_array!(Float32Array, arrow_growable::ArrowFloat32Growable<'a>);
impl_growable_array!(Float64Array, arrow_growable::ArrowFloat64Growable<'a>);
impl_growable_array!(BinaryArray, arrow_growable::ArrowBinaryGrowable<'a>);
impl_growable_array!(
FixedSizeBinaryArray,
arrow_growable::ArrowFixedSizeBinaryGrowable<'a>
);
impl_growable_array!(Utf8Array, arrow_growable::ArrowUtf8Growable<'a>);
impl_growable_array!(ExtensionArray, arrow_growable::ArrowExtensionGrowable<'a>);
impl_growable_array!(
Expand Down
3 changes: 2 additions & 1 deletion src/daft-core/src/array/ops/as_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
array::DataArray,
datatypes::{
logical::{DateArray, Decimal128Array, DurationArray, TimeArray, TimestampArray},
BinaryArray, BooleanArray, DaftNumericType, NullArray, Utf8Array,
BinaryArray, BooleanArray, DaftNumericType, FixedSizeBinaryArray, NullArray, Utf8Array,
},
};

Expand Down Expand Up @@ -58,6 +58,7 @@ impl_asarrow_dataarray!(NullArray, array::NullArray);
impl_asarrow_dataarray!(Utf8Array, array::Utf8Array<i64>);
impl_asarrow_dataarray!(BooleanArray, array::BooleanArray);
impl_asarrow_dataarray!(BinaryArray, array::BinaryArray<i64>);
impl_asarrow_dataarray!(FixedSizeBinaryArray, array::FixedSizeBinaryArray);

#[cfg(feature = "python")]
impl_asarrow_dataarray!(PythonArray, PseudoArrowArray<pyo3::PyObject>);
Expand Down
3 changes: 3 additions & 0 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,9 @@ impl PythonArray {
}
DataType::Boolean => pycast_then_arrowcast!(self, DataType::Boolean, "bool"),
DataType::Binary => pycast_then_arrowcast!(self, DataType::Binary, "bytes"),
DataType::FixedSizeBinary(size) => {
pycast_then_arrowcast!(self, DataType::FixedSizeBinary(*size), "fixed_size_bytes")
}
DataType::Utf8 => pycast_then_arrowcast!(self, DataType::Utf8, "str"),
dt @ DataType::UInt8
| dt @ DataType::UInt16
Expand Down
Loading

0 comments on commit a8876f0

Please sign in to comment.