Skip to content

Commit

Permalink
[FEAT] MapArray (#1959)
Browse files Browse the repository at this point in the history
Closes #1847 

Introduces a new MapArray type, which will improve our iceberg/parquet
compatibility.

MapArray is implemented as a logical type on top of ListArray, where the
fields are Structs with key and value entries.
  • Loading branch information
colin-ho authored Mar 4, 2024
1 parent 524777e commit 3e0e334
Show file tree
Hide file tree
Showing 33 changed files with 440 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ url = "2.4.0"
# branch = "daft-fork"
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
rev = "d5685eebf1d65c3f3d854370ad39f93dcd91971a"
rev = "c0764b00cc05126c80c7ce17ebd7a95d87f815c1"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ class PyDataType:
@staticmethod
def fixed_size_list(data_type: PyDataType, size: int) -> PyDataType: ...
@staticmethod
def map(key_type: PyDataType, value_type: PyDataType) -> PyDataType: ...
@staticmethod
def struct(fields: dict[str, PyDataType]) -> PyDataType: ...
@staticmethod
def extension(name: str, storage_data_type: PyDataType, metadata: str | None = None) -> PyDataType: ...
Expand All @@ -842,6 +844,7 @@ class PyDataType:
def is_fixed_shape_image(self) -> builtins.bool: ...
def is_tensor(self) -> builtins.bool: ...
def is_fixed_shape_tensor(self) -> builtins.bool: ...
def is_map(self) -> builtins.bool: ...
def is_logical(self) -> builtins.bool: ...
def is_temporal(self) -> builtins.bool: ...
def is_equal(self, other: Any) -> builtins.bool: ...
Expand Down
24 changes: 24 additions & 0 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ def fixed_size_list(cls, dtype: DataType, size: int) -> DataType:
raise ValueError("The size for a fixed-size list must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.fixed_size_list(dtype._dtype, size))

@classmethod
def map(cls, key_type: DataType, value_type: DataType) -> DataType:
"""Create a Map DataType: A map is a nested type of key-value pairs that is implemented as a list of structs with two fields, key and value.
Args:
key_type: DataType of the keys in the map
value_type: DataType of the values in the map
"""
return cls._from_pydatatype(PyDataType.map(key_type._dtype, value_type._dtype))

@classmethod
def struct(cls, fields: dict[str, DataType]) -> DataType:
"""Create a Struct DataType: a nested type which has names mapped to child types
Expand Down Expand Up @@ -387,6 +396,12 @@ def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
assert isinstance(arrow_type, pa.StructType)
fields = [arrow_type[i] for i in range(arrow_type.num_fields)]
return cls.struct({field.name: cls.from_arrow_type(field.type) for field in fields})
elif pa.types.is_map(arrow_type):
assert isinstance(arrow_type, pa.MapType)
return cls.map(
key_type=cls.from_arrow_type(arrow_type.key_type),
value_type=cls.from_arrow_type(arrow_type.item_type),
)
elif _RAY_DATA_EXTENSIONS_AVAILABLE and isinstance(arrow_type, tuple(_TENSOR_EXTENSION_TYPES)):
scalar_dtype = cls.from_arrow_type(arrow_type.scalar_type)
shape = arrow_type.shape if isinstance(arrow_type, ArrowTensorType) else None
Expand Down Expand Up @@ -464,12 +479,21 @@ def _is_image_type(self) -> builtins.bool:
def _is_fixed_shape_image_type(self) -> builtins.bool:
return self._dtype.is_fixed_shape_image()

def _is_map(self) -> builtins.bool:
return self._dtype.is_map()

def _is_logical_type(self) -> builtins.bool:
return self._dtype.is_logical()

def _is_temporal_type(self) -> builtins.bool:
return self._dtype.is_temporal()

def _should_cast_to_python(self) -> builtins.bool:
# NOTE: This is used to determine if we should cast a column to a Python object type when converting to PyList.
# Map is a logical type, but we don't want to cast it to Python because the underlying physical type is a List,
# which we can handle without casting to Python.
return self._is_logical_type() and not self._is_map()

def __repr__(self) -> str:
return self._dtype.__repr__()

Expand Down
2 changes: 1 addition & 1 deletion daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def to_pylist(self) -> list:
"""
if self.datatype()._is_python_type():
return self._series.to_pylist()
elif self.datatype()._is_logical_type():
elif self.datatype()._should_cast_to_python():
return self._series.cast(DataType.python()._dtype).to_pylist()
else:
return self._series.to_arrow().to_pylist()
Expand Down
3 changes: 2 additions & 1 deletion src/daft-core/src/array/growable/logical_growable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
datatypes::{
logical::LogicalArray, DaftDataType, DaftLogicalType, DateType, Decimal128Type,
DurationType, EmbeddingType, Field, FixedShapeImageType, FixedShapeTensorType, ImageType,
TensorType, TimeType, TimestampType,
MapType, TensorType, TimeType, TimestampType,
},
DataType, IntoSeries, Series,
};
Expand Down Expand Up @@ -84,3 +84,4 @@ impl_logical_growable!(LogicalFixedShapeTensorGrowable, FixedShapeTensorType);
impl_logical_growable!(LogicalImageGrowable, ImageType);
impl_logical_growable!(LogicalDecimal128Growable, Decimal128Type);
impl_logical_growable!(LogicalTensorGrowable, TensorType);
impl_logical_growable!(LogicalMapGrowable, MapType);
3 changes: 2 additions & 1 deletion src/daft-core/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, TensorArray, TimeArray, TimestampArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
},
BinaryArray, BooleanArray, ExtensionArray, Float32Array, Float64Array, Int128Array,
Int16Array, Int32Array, Int64Array, Int8Array, NullArray, UInt16Array, UInt32Array,
Expand Down Expand Up @@ -211,3 +211,4 @@ impl_growable_array!(
Decimal128Array,
logical_growable::LogicalDecimal128Growable<'a>
);
impl_growable_array!(MapArray, logical_growable::LogicalMapGrowable<'a>);
15 changes: 13 additions & 2 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, LogicalArray, LogicalArrayImpl, TensorArray,
TimeArray, TimestampArray,
FixedShapeTensorArray, ImageArray, LogicalArray, LogicalArrayImpl, MapArray,
TensorArray, TimeArray, TimestampArray,
},
DaftArrowBackedType, DaftLogicalType, DataType, Field, ImageMode, Int64Array, TimeUnit,
UInt64Array, Utf8Array,
Expand Down Expand Up @@ -1727,11 +1727,22 @@ impl ListArray {
}
}
}
DataType::Map(..) => Ok(MapArray::new(
Field::new(self.name(), dtype.clone()),
self.clone(),
)
.into_series()),
_ => unimplemented!("List casting not implemented for dtype: {}", dtype),
}
}
}

impl MapArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
self.physical.cast(dtype)
}
}

impl StructArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match (self.data_type(), dtype) {
Expand Down
55 changes: 47 additions & 8 deletions src/daft-core/src/array/ops/from_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ where
{
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
let data_array_field = Arc::new(Field::new(field.name.clone(), field.dtype.to_physical()));
let physical_arrow_arr = arrow_arr.to_type(data_array_field.dtype.to_arrow()?);
let physical_arrow_arr = match field.dtype {
// TODO: Consolidate Map to use the same .to_type conversion as other logical types
// Currently, .to_type does not work for Map in Arrow2 because it requires physical types to be equivalent,
// but the physical type of MapArray in Arrow2 is a MapArray, not a ListArray
DataType::Map(..) => arrow_arr,
_ => arrow_arr.to_type(data_array_field.dtype.to_arrow()?),
};
let physical = <L::PhysicalType as DaftDataType>::ArrayType::from_arrow(
data_array_field,
physical_arrow_arr,
Expand Down Expand Up @@ -64,21 +70,54 @@ impl FromArrow for FixedSizeListArray {
impl FromArrow for ListArray {
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
match (&field.dtype, arrow_arr.data_type()) {
(DataType::List(daft_child_dtype), arrow2::datatypes::DataType::List(arrow_child_field)) |
(DataType::List(daft_child_dtype), arrow2::datatypes::DataType::LargeList(arrow_child_field))
=> {
let arrow_arr = arrow_arr.to_type(arrow2::datatypes::DataType::LargeList(arrow_child_field.clone()));
let arrow_arr = arrow_arr.as_any().downcast_ref::<arrow2::array::ListArray<i64>>().unwrap();
(
DataType::List(daft_child_dtype),
arrow2::datatypes::DataType::List(arrow_child_field),
)
| (
DataType::List(daft_child_dtype),
arrow2::datatypes::DataType::LargeList(arrow_child_field),
) => {
let arrow_arr = arrow_arr.to_type(arrow2::datatypes::DataType::LargeList(
arrow_child_field.clone(),
));
let arrow_arr = arrow_arr
.as_any()
.downcast_ref::<arrow2::array::ListArray<i64>>()
.unwrap();
let arrow_child_array = arrow_arr.values();
let child_series = Series::from_arrow(Arc::new(Field::new("list", daft_child_dtype.as_ref().clone())), arrow_child_array.clone())?;
let child_series = Series::from_arrow(
Arc::new(Field::new("list", daft_child_dtype.as_ref().clone())),
arrow_child_array.clone(),
)?;
Ok(ListArray::new(
field.clone(),
child_series,
arrow_arr.offsets().clone(),
arrow_arr.validity().cloned(),
))
}
(d, a) => Err(DaftError::TypeError(format!("Attempting to create Daft FixedSizeListArray with type {} from arrow array with type {:?}", d, a)))
(DataType::List(daft_child_dtype), arrow2::datatypes::DataType::Map(..)) => {
let map_arr = arrow_arr
.as_any()
.downcast_ref::<arrow2::array::MapArray>()
.unwrap();
let arrow_child_array = map_arr.field();
let child_series = Series::from_arrow(
Arc::new(Field::new("map", daft_child_dtype.as_ref().clone())),
arrow_child_array.clone(),
)?;
Ok(ListArray::new(
field.clone(),
child_series,
map_arr.offsets().try_into().unwrap(),
arrow_arr.validity().cloned(),
))
}
(d, a) => Err(DaftError::TypeError(format!(
"Attempting to create Daft ListArray with type {} from arrow array with type {:?}",
d, a
))),
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/daft-core/src/array/ops/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{
array::{DataArray, FixedSizeListArray, ListArray},
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, LogicalArrayImpl, TimeArray, TimestampArray,
DateArray, Decimal128Array, DurationArray, LogicalArrayImpl, MapArray, TimeArray,
TimestampArray,
},
BinaryArray, BooleanArray, DaftLogicalType, DaftNumericType, ExtensionArray, NullArray,
Utf8Array,
Expand Down Expand Up @@ -159,6 +160,13 @@ impl ListArray {
}
}

impl MapArray {
#[inline]
pub fn get(&self, idx: usize) -> Option<Series> {
self.physical.get(idx)
}
}

#[cfg(test)]
mod tests {
use common_error::DaftResult;
Expand Down
13 changes: 12 additions & 1 deletion src/daft-core/src/array/ops/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, TensorArray, TimeArray, TimestampArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
},
BinaryArray, BooleanArray, DaftNumericType, ExtensionArray, ImageFormat, NullArray,
UInt64Array, Utf8Array,
Expand Down Expand Up @@ -227,6 +227,16 @@ impl FixedSizeListArray {
}
}

impl MapArray {
pub fn str_value(&self, idx: usize) -> DaftResult<String> {
let val = self.get(idx);
match val {
None => Ok("None".to_string()),
Some(v) => series_as_list_str(&v),
}
}
}

impl EmbeddingArray {
pub fn str_value(&self, idx: usize) -> DaftResult<String> {
if self.physical.is_valid(idx) {
Expand Down Expand Up @@ -338,6 +348,7 @@ impl_array_html_value!(NullArray);
impl_array_html_value!(BinaryArray);
impl_array_html_value!(ListArray);
impl_array_html_value!(FixedSizeListArray);
impl_array_html_value!(MapArray);
impl_array_html_value!(StructArray);
impl_array_html_value!(ExtensionArray);
impl_array_html_value!(Decimal128Array);
Expand Down
8 changes: 7 additions & 1 deletion src/daft-core/src/array/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, TensorArray, TimeArray, TimestampArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
},
BinaryArray, BooleanArray, DaftIntegerType, DaftNumericType, ExtensionArray, Float32Array,
Float64Array, NullArray, Utf8Array,
Expand Down Expand Up @@ -574,6 +574,12 @@ impl ListArray {
}
}

impl MapArray {
pub fn sort(&self, _descending: bool) -> DaftResult<Self> {
todo!("impl sort for MapArray")
}
}

impl StructArray {
pub fn sort(&self, _descending: bool) -> DaftResult<Self> {
todo!("impl sort for StructArray")
Expand Down
3 changes: 2 additions & 1 deletion src/daft-core/src/array/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, TensorArray, TimeArray, TimestampArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
},
BinaryArray, BooleanArray, DaftIntegerType, DaftNumericType, ExtensionArray, NullArray,
Utf8Array,
Expand Down Expand Up @@ -78,6 +78,7 @@ impl_logicalarray_take!(ImageArray);
impl_logicalarray_take!(FixedShapeImageArray);
impl_logicalarray_take!(TensorArray);
impl_logicalarray_take!(FixedShapeTensorArray);
impl_logicalarray_take!(MapArray);

#[cfg(feature = "python")]
impl crate::datatypes::PythonArray {
Expand Down
Loading

0 comments on commit 3e0e334

Please sign in to comment.