Skip to content

Commit

Permalink
fix thingy lol
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Sep 26, 2024
1 parent 83e4bfc commit a4e1c7b
Show file tree
Hide file tree
Showing 15 changed files with 1,041 additions and 163 deletions.
812 changes: 779 additions & 33 deletions .vscode/launch.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"rust-analyzer.check.extraEnv": {
" pub fn to_arrow(&self) -> Box<dyn arrow2::array::Array> {\n let daft_type = self.data_type();\n let arrow_logical_type = daft_type.to_arrow().unwrap();\n let physical_arrow_array = self.physical.data();\nCARGO_TARGET_DIR": "target/analyzer"
"CARGO_TARGET_DIR": "target/analyzer"
},
"rust-analyzer.check.features": "all",
"rust-analyzer.cargo.features": "all",
Expand Down
113 changes: 56 additions & 57 deletions something.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
# from __future__ import annotations
# # from __future__ import annotations

import datetime

import time

# Sleep for 5 seconds
# # Sleep for 5 seconds
import pyarrow as pa

# import pytest
# from daft.expressions import col
# from daft.table import MicroPartition
# # import pytest
from daft.expressions import col
from daft.table import MicroPartition
import daft

# def test_map_get():
# data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64()))
# table = MicroPartition.from_arrow(pa.table({"map_col": data}))
# # def test_map_get():
# # data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64()))
# # table = MicroPartition.from_arrow(pa.table({"map_col": data}))

# result = table.eval_expression_list([col("map_col").map.get(1)])
# # result = table.eval_expression_list([col("map_col").map.get(1)])

# assert result.to_pydict() == {"value": [2, None, None]}
# # assert result.to_pydict() == {"value": [2, None, None]}


# def test_map_get_broadcasted():
# data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64()))
# keys = pa.array([1, 3, 2], type=pa.int64())
# table = MicroPartition.from_arrow(pa.table({"map_col": data, "key": keys}))
# # def test_map_get_broadcasted():
# # data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64()))
# # keys = pa.array([1, 3, 2], type=pa.int64())
# # table = MicroPartition.from_arrow(pa.table({"map_col": data, "key": keys}))

# result = table.eval_expression_list([col("map_col").map.get(col("key"))])
# # result = table.eval_expression_list([col("map_col").map.get(col("key"))])

# assert result.to_pydict() == {"value": [2, None, 1]}
# # assert result.to_pydict() == {"value": [2, None, 1]}


# def test_map_get_duplicate_keys():
# # Only the first value is returned
# data = pa.array([[(1, 2), (1, 3)]], type=pa.map_(pa.int64(), pa.int64()))
# table = MicroPartition.from_arrow(pa.table({"map_col": data}))
# # def test_map_get_duplicate_keys():
# # # Only the first value is returned
# # data = pa.array([[(1, 2), (1, 3)]], type=pa.map_(pa.int64(), pa.int64()))
# # table = MicroPartition.from_arrow(pa.table({"map_col": data}))

# result = table.eval_expression_list([col("map_col").map.get(1)])
# # result = table.eval_expression_list([col("map_col").map.get(1)])

# assert result.to_pydict() == {"value": [2]}
# # assert result.to_pydict() == {"value": [2]}


# def test_list_array():
# print("HIIIIIII")
# data = pa.array(
# [
# [datetime.date(2022, 1, 1)],
# [datetime.date(2022, 1, 2)],
# [],
# ],
# type=pa.list_(pa.date32()), # logical types
# )
# # def test_list_array():
# # print("HIIIIIII")
# # data = pa.array(
# # [
# # [datetime.date(2022, 1, 1)],
# # [datetime.date(2022, 1, 2)],
# # [],
# # ],
# # type=pa.list_(pa.date32()), # logical types
# # )

# table = MicroPartition.from_arrow(pa.table({"map_col": data}))
# # table = MicroPartition.from_arrow(pa.table({"map_col": data}))

# print("TABLE", table)
# print("oi")
# # print("TABLE", table)
# # print("oi")

# # result = table.eval_expression_list([col("map_col").map.get("foo")])
# # # result = table.eval_expression_list([col("map_col").map.get("foo")])

# # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]}
# # # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]}


def test_map_get_logical_type():
Expand All @@ -73,35 +73,34 @@ def test_map_get_logical_type():
type=pa.map_(pa.string(), pa.date32()), # logical types
)

assert isinstance(data, pa.Array)
assert data.type == pa.map_(pa.string(), pa.date32())
assert len(data) == 3
assert data[0].as_py() == [("foo", datetime.date(2022, 1, 1))]
assert data[1].as_py() == [("foo", datetime.date(2022, 1, 2))]
assert data[2].as_py() == []
# assert isinstance(data, pa.Array)
# assert data.type == pa.map_(pa.string(), pa.date32())
# assert len(data) == 3
# assert data[0].as_py() == [("foo", datetime.date(2022, 1, 1))]
# assert data[1].as_py() == [("foo", datetime.date(2022, 1, 2))]
# assert data[2].as_py() == []

# Assert physical types
assert str(data.type) == "map<string, date32[day]>"
# # Assert physical types
# assert str(data.type) == "map<string, date32[day]>"

# Convert types
# # Convert types

table = daft.table.MicroPartition.from_arrow(pa.table({"map_col": data}))


# result = table.eval_expression_list([col("map_col").map.get("foo")])
# # result = table.eval_expression_list([col("map_col").map.get("foo")])

# assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]}
# # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]}


# def test_map_get_bad_field():
# data = pa.array([[(1, 2)], [(2, 3)]], type=pa.map_(pa.int64(), pa.int64()))
# table = MicroPartition.from_arrow(pa.table({"map_col": data}))
# # def test_map_get_bad_field():
# # data = pa.array([[(1, 2)], [(2, 3)]], type=pa.map_(pa.int64(), pa.int64()))
# # table = MicroPartition.from_arrow(pa.table({"map_col": data}))

# with pytest.raises(ValueError):
# table.eval_expression_list([col("map_col").map.get("foo")])
# # with pytest.raises(ValueError):
# # table.eval_expression_list([col("map_col").map.get("foo")])


if __name__ == "__main__":
time.sleep(5)

test_map_get_logical_type()
print("starting")
test_map_get_logical_type()
print("done")
21 changes: 12 additions & 9 deletions src/arrow2/src/array/map/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray};
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
error::Error,
offset::OffsetsBuffer,
};

use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray};

mod ffi;
pub(super) mod fmt;
mod iterator;
#[allow(unused)]
pub use iterator::*;

use crate::datatypes::PhysicalType;

/// An array representing a (key, value), both of arbitrary logical types.
Expand Down Expand Up @@ -52,7 +52,10 @@ impl MapArray {

let inner_len = inner.len();
if inner_len != 2 {
let msg = format!("MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields", inner_len);
let msg = format!(
"MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields",
inner_len
);
return Err(Error::InvalidArgumentError(msg));
}

Expand Down Expand Up @@ -227,15 +230,15 @@ impl Array for MapArray {
panic!("inner types are not equal");
}

let mut field = self.field.clone();
field.change_type(target_inner.data_type.clone());

let offsets = self.offsets().clone();
let offsets = offsets.map(|offset| offset as i64);

let list = ListArray::new(
target,
offsets,
self.field.clone(),
self.validity.clone(),
);
let debug = format!("{:#?}", field.data_type());
let target_debug = format!("{:#?}", target);
let list = ListArray::new(target, offsets, field, self.validity.clone());

Box::new(list)
}
Expand Down
21 changes: 18 additions & 3 deletions src/arrow2/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// When the validity is [`None`], all slots are valid.
fn validity(&self) -> Option<&Bitmap>;

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
let dbg = format!("{:#?}", self.data_type());
Box::new(core::iter::empty())
}

/// The number of null slots on this [`Array`].
/// # Implementation
/// This is `O(1)` since the number of null elements is pre-computed.
Expand Down Expand Up @@ -185,7 +190,6 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`.
#[tracing::instrument(level = "trace", name = "Array::convert_logical_type", skip_all)]
fn convert_logical_type(&self, data_type: DataType) -> Box<dyn Array> {
tracing::trace!("converting logical type to\n{data_type:#?}");
let mut new = self.to_boxed();
new.change_type(data_type);
new
Expand Down Expand Up @@ -643,8 +647,19 @@ macro_rules! impl_common_array {
data_type.to_physical_type(),
);
}
self.data_type = data_type;

self.data_type = data_type.clone();

let mut children = self.direct_children();

data_type.direct_children(|child| {
let Some(child_elem) = children.next() else {
return;
};
child_elem.change_type(child.clone());
})
}

};
}

Expand Down Expand Up @@ -739,7 +754,7 @@ pub(crate) use self::ffi::ToFfi;
/// This is similar to [`Extend`], but accepted the creation to error.
pub trait TryExtend<A> {
/// Fallible version of [`Extend::extend`].
fn try_extend<I: IntoIterator<Item = A>>(&mut self, iter: I) -> Result<()>;
fn try_extend<I: IntoIterator<Item=A>>(&mut self, iter: I) -> Result<()>;
}

/// A trait describing the ability of a struct to receive new items.
Expand Down
11 changes: 10 additions & 1 deletion src/arrow2/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::ops::DerefMut;
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
error::Error,
};

use super::{new_empty_array, new_null_array, Array};
use super::{new_empty_array, new_null_array, Array, ToFfi};

mod ffi;
pub(super) mod fmt;
Expand Down Expand Up @@ -246,6 +247,14 @@ impl StructArray {
impl Array for StructArray {
impl_common_array!();

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
let iter = self.values
.iter_mut()
.map(|x| x.deref_mut());

Box::new(iter)
}

fn validity(&self) -> Option<&Bitmap> {
self.validity.as_ref()
}
Expand Down
21 changes: 21 additions & 0 deletions src/arrow2/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub type Metadata = BTreeMap<String, String>;
/// typedef for [Option<(String, Option<String>)>] descr
pub(crate) type Extension = Option<(String, Option<String>)>;

pub type ArrowDataType = DataType;
pub type ArrowField = Field;


/// The set of supported logical types in this crate.
///
/// Each variant uniquely identifies a logical type, which define specific semantics to the data
Expand Down Expand Up @@ -159,6 +163,23 @@ pub enum DataType {
Extension(String, Box<DataType>, Option<String>),
}

impl DataType {
pub fn direct_children(&self, mut processor: impl FnMut(&DataType)) {
match self {
DataType::Null | DataType::Boolean | DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Decimal(..) | DataType::Decimal256(..) |
DataType::Int64 | DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
DataType::Float16 | DataType::Float32 | DataType::Float64 | DataType::Timestamp(_, _) |
DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) |
DataType::Duration(_) | DataType::Interval(_) | DataType::Binary | DataType::FixedSizeBinary(_) |
DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {}
DataType::List(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) | DataType::Map(field, ..) => processor(&field.data_type),
DataType::Struct(fields) | DataType::Union(fields, _, _) => fields.iter().for_each(|field| processor(&field.data_type)),
DataType::Dictionary(key_type, value_type, _) => todo!(),
DataType::Extension(_, inner, _) => processor(inner),
}
}
}

/// Mode of [`DataType::Union`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum UnionMode {
Expand Down
30 changes: 16 additions & 14 deletions src/daft-core/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod prelude;
use std::{marker::PhantomData, sync::Arc};

use common_error::{DaftError, DaftResult};

use daft_schema::field::DaftField;
use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field};

#[derive(Debug)]
Expand All @@ -45,42 +45,44 @@ where
T: DaftPhysicalType,
{
pub fn new(
daft_field_with_physical: Arc<Field>,
arrow_data: Box<dyn arrow2::array::Array>,
physical_field: Arc<DaftField>,
arrow_array: Box<dyn arrow2::array::Array>,
) -> DaftResult<Self> {
assert!(
daft_field_with_physical.dtype.is_physical(),
physical_field.dtype.is_physical(),
"Can only construct DataArray for PhysicalTypes, got {}",
daft_field_with_physical.dtype
physical_field.dtype
);

if let Ok(expected_arrow_physical_type) =
daft_field_with_physical.dtype.to_physical().to_arrow()
physical_field.dtype.to_arrow() // For instance Int32 -> Int 32
{
let maybe_arrow_logical_type = arrow_data.data_type(); // logical type
if !expected_arrow_physical_type.eq(maybe_arrow_logical_type) {
let arrow_data_type = arrow_array.data_type(); // logical type

if !expected_arrow_physical_type.eq(arrow_data_type) {
panic!(
"Mismatch between expected and actual Arrow types for DataArray.\n\
Field name: {}\n\
Logical type: {}\n\
Physical type: {}\n\
Expected Arrow physical type: {:?}\n\
Actual Arrow Logical type: {:?}\n\
This error typically occurs when there's a discrepancy between the Daft DataType \
and the underlying Arrow representation. Please ensure that the physical type \
of the Daft DataType matches the Arrow type of the provided data.",
daft_field_with_physical.name,
daft_field_with_physical.dtype,
daft_field_with_physical.dtype.to_physical(),
physical_field.name,
physical_field.dtype,
physical_field.dtype.to_physical(),
expected_arrow_physical_type,
maybe_arrow_logical_type
arrow_data_type
);
}
}

Ok(Self {
field: daft_field_with_physical,
data: arrow_data,
field: physical_field,
data: arrow_array,
marker_: PhantomData,
})
}
Expand Down
Loading

0 comments on commit a4e1c7b

Please sign in to comment.