diff --git a/.vscode/launch.json b/.vscode/launch.json index 5b6207a880..a097a8959f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,35 +1,781 @@ { - "configurations": [ - { - "name": "Debug Rust/Python", - "type": "debugpy", - "request": "launch", - "program": "${workspaceFolder}/tools/attach_debugger.py", - "args": [ - "${workspaceFolder}/something.py" - ], - // "console": "internalConsole", - "console": "integratedTerminal", - "justMyCode": true, - "serverReadyAction": { - "pattern": "pID = ([0-9]+)", - "action": "startDebugging", - "name": "Rust LLDB" - } - }, - { - "name": "Rust LLDB", - "pid": "71088", - "type": "lldb", - "request": "attach", - "program": "${command:python.interpreterPath}", - "stopOnEntry": false, - "sourceLanguages": [ - "rust" - ], - "presentation": { - "hidden": true - } - } - ] + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "something", + "program": "${workspaceFolder}/.new-venv/bin/python", + "args": ["/Users/andrewgazelka/Projects/Daft/something.py"], + "cwd": "${workspaceFolder}", + "sourceLanguages": ["rust"], + "terminal": "integrated", + "preRunCommands": [ + "settings set target.load-cwd-lldbinit true" + ] + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'arrow2'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=arrow2" + ], + "filter": { + "name": "arrow2", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'parquet2'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=parquet2" + ], + "filter": { + "name": "parquet2", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_display'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-display" + ], + "filter": { + "name": "common_display", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_error'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-error" + ], + "filter": { + "name": "common_error", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_io_config'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-io-config" + ], + "filter": { + "name": "common_io_config", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_py_serde'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-py-serde" + ], + "filter": { + "name": "common_py_serde", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_treenode'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-treenode" + ], + "filter": { + "name": "common_treenode", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_daft_config'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-daft-config" + ], + "filter": { + "name": "common_daft_config", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_system_info'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-system-info" + ], + "filter": { + "name": "common_system_info", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_core'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-core" + ], + "filter": { + "name": "daft_core", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_arrow_ffi'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-arrow-ffi" + ], + "filter": { + "name": "common_arrow_ffi", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_hashable_float_wrapper'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-hashable-float-wrapper" + ], + "filter": { + "name": "common_hashable_float_wrapper", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_minhash'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-minhash" + ], + "filter": { + "name": "daft_minhash", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug benchmark 'minhash'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bench=minhash", + "--package=daft-minhash" + ], + "filter": { + "name": "minhash", + "kind": "bench" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_schema'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-schema" + ], + "filter": { + "name": "daft_schema", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_version'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-version" + ], + "filter": { + "name": "common_version", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_sketch'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-sketch" + ], + "filter": { + "name": "daft_sketch", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'hyperloglog'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=hyperloglog" + ], + "filter": { + "name": "hyperloglog", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_local_execution'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-local-execution" + ], + "filter": { + "name": "daft_local_execution", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_file_formats'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-file-formats" + ], + "filter": { + "name": "common_file_formats", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_tracing'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-tracing" + ], + "filter": { + "name": "common_tracing", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_csv'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-csv" + ], + "filter": { + "name": "daft_csv", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_compression'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-compression" + ], + "filter": { + "name": "daft_compression", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_decoding'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-decoding" + ], + "filter": { + "name": "daft_decoding", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_dsl'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-dsl" + ], + "filter": { + "name": "daft_dsl", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'common_resource_request'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=common-resource-request" + ], + "filter": { + "name": "common_resource_request", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_io'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-io" + ], + "filter": { + "name": "daft_io", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_table'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-table" + ], + "filter": { + "name": "daft_table", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_image'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-image" + ], + "filter": { + "name": "daft_image", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_json'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-json" + ], + "filter": { + "name": "daft_json", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_micropartition'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-micropartition" + ], + "filter": { + "name": "daft_micropartition", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_parquet'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-parquet" + ], + "filter": { + "name": "daft_parquet", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_stats'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-stats" + ], + "filter": { + "name": "daft_stats", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_scan'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-scan" + ], + "filter": { + "name": "daft_scan", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_physical_plan'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-physical-plan" + ], + "filter": { + "name": "daft_physical_plan", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_plan'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-plan" + ], + "filter": { + "name": "daft_plan", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_functions'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-functions" + ], + "filter": { + "name": "daft_functions", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_scheduler'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-scheduler" + ], + "filter": { + "name": "daft_scheduler", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_functions_json'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-functions-json" + ], + "filter": { + "name": "daft_functions_json", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'daft_sql'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=daft-sql" + ], + "filter": { + "name": "daft_sql", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] } \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index a4ee30520c..1038ded445 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,6 @@ { "rust-analyzer.check.extraEnv": { - " pub fn to_arrow(&self) -> Box {\n let daft_type = self.data_type();\n let arrow_logical_type = daft_type.to_arrow().unwrap();\n let physical_arrow_array = self.physical.data();\nCARGO_TARGET_DIR": "target/analyzer" + "CARGO_TARGET_DIR": "target/analyzer" }, "rust-analyzer.check.features": "all", "rust-analyzer.cargo.features": "all", diff --git a/something.py b/something.py index 4e8a46950f..8660f9a13a 100644 --- a/something.py +++ b/something.py @@ -1,65 +1,65 @@ -# from __future__ import annotations +# # from __future__ import annotations import datetime import time -# Sleep for 5 seconds +# # Sleep for 5 seconds import pyarrow as pa -# import pytest -# from daft.expressions import col -# from daft.table import MicroPartition +# # import pytest +from daft.expressions import col +from daft.table import MicroPartition import daft -# def test_map_get(): -# data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64())) -# table = MicroPartition.from_arrow(pa.table({"map_col": data})) +# # def test_map_get(): +# # data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64())) +# # table = MicroPartition.from_arrow(pa.table({"map_col": data})) -# result = table.eval_expression_list([col("map_col").map.get(1)]) +# # result = table.eval_expression_list([col("map_col").map.get(1)]) -# assert result.to_pydict() == {"value": [2, None, None]} +# # assert result.to_pydict() == {"value": [2, None, None]} -# def test_map_get_broadcasted(): -# data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64())) -# keys = pa.array([1, 3, 2], type=pa.int64()) -# table = MicroPartition.from_arrow(pa.table({"map_col": data, "key": keys})) +# # def test_map_get_broadcasted(): +# # data = pa.array([[(1, 2)], [], [(2, 1)]], type=pa.map_(pa.int64(), pa.int64())) +# # keys = pa.array([1, 3, 2], type=pa.int64()) +# # table = MicroPartition.from_arrow(pa.table({"map_col": data, "key": keys})) -# result = table.eval_expression_list([col("map_col").map.get(col("key"))]) +# # result = table.eval_expression_list([col("map_col").map.get(col("key"))]) -# assert result.to_pydict() == {"value": [2, None, 1]} +# # assert result.to_pydict() == {"value": [2, None, 1]} -# def test_map_get_duplicate_keys(): -# # Only the first value is returned -# data = pa.array([[(1, 2), (1, 3)]], type=pa.map_(pa.int64(), pa.int64())) -# table = MicroPartition.from_arrow(pa.table({"map_col": data})) +# # def test_map_get_duplicate_keys(): +# # # Only the first value is returned +# # data = pa.array([[(1, 2), (1, 3)]], type=pa.map_(pa.int64(), pa.int64())) +# # table = MicroPartition.from_arrow(pa.table({"map_col": data})) -# result = table.eval_expression_list([col("map_col").map.get(1)]) +# # result = table.eval_expression_list([col("map_col").map.get(1)]) -# assert result.to_pydict() == {"value": [2]} +# # assert result.to_pydict() == {"value": [2]} -# def test_list_array(): -# print("HIIIIIII") -# data = pa.array( -# [ -# [datetime.date(2022, 1, 1)], -# [datetime.date(2022, 1, 2)], -# [], -# ], -# type=pa.list_(pa.date32()), # logical types -# ) +# # def test_list_array(): +# # print("HIIIIIII") +# # data = pa.array( +# # [ +# # [datetime.date(2022, 1, 1)], +# # [datetime.date(2022, 1, 2)], +# # [], +# # ], +# # type=pa.list_(pa.date32()), # logical types +# # ) -# table = MicroPartition.from_arrow(pa.table({"map_col": data})) +# # table = MicroPartition.from_arrow(pa.table({"map_col": data})) -# print("TABLE", table) -# print("oi") +# # print("TABLE", table) +# # print("oi") -# # result = table.eval_expression_list([col("map_col").map.get("foo")]) +# # # result = table.eval_expression_list([col("map_col").map.get("foo")]) -# # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]} +# # # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]} def test_map_get_logical_type(): @@ -73,35 +73,34 @@ def test_map_get_logical_type(): type=pa.map_(pa.string(), pa.date32()), # logical types ) - assert isinstance(data, pa.Array) - assert data.type == pa.map_(pa.string(), pa.date32()) - assert len(data) == 3 - assert data[0].as_py() == [("foo", datetime.date(2022, 1, 1))] - assert data[1].as_py() == [("foo", datetime.date(2022, 1, 2))] - assert data[2].as_py() == [] +# assert isinstance(data, pa.Array) +# assert data.type == pa.map_(pa.string(), pa.date32()) +# assert len(data) == 3 +# assert data[0].as_py() == [("foo", datetime.date(2022, 1, 1))] +# assert data[1].as_py() == [("foo", datetime.date(2022, 1, 2))] +# assert data[2].as_py() == [] - # Assert physical types - assert str(data.type) == "map" +# # Assert physical types +# assert str(data.type) == "map" - # Convert types +# # Convert types table = daft.table.MicroPartition.from_arrow(pa.table({"map_col": data})) -# result = table.eval_expression_list([col("map_col").map.get("foo")]) +# # result = table.eval_expression_list([col("map_col").map.get("foo")]) -# assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]} +# # assert result.to_pydict() == {"value": [datetime.date(2022, 1, 1), datetime.date(2022, 1, 2), None]} -# def test_map_get_bad_field(): -# data = pa.array([[(1, 2)], [(2, 3)]], type=pa.map_(pa.int64(), pa.int64())) -# table = MicroPartition.from_arrow(pa.table({"map_col": data})) +# # def test_map_get_bad_field(): +# # data = pa.array([[(1, 2)], [(2, 3)]], type=pa.map_(pa.int64(), pa.int64())) +# # table = MicroPartition.from_arrow(pa.table({"map_col": data})) -# with pytest.raises(ValueError): -# table.eval_expression_list([col("map_col").map.get("foo")]) +# # with pytest.raises(ValueError): +# # table.eval_expression_list([col("map_col").map.get("foo")]) -if __name__ == "__main__": - time.sleep(5) - - test_map_get_logical_type() +print("starting") +test_map_get_logical_type() +print("done") diff --git a/src/arrow2/src/array/map/mod.rs b/src/arrow2/src/array/map/mod.rs index acd04f1af3..eabf7e529e 100644 --- a/src/arrow2/src/array/map/mod.rs +++ b/src/arrow2/src/array/map/mod.rs @@ -1,3 +1,4 @@ +use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray}; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, @@ -5,13 +6,12 @@ use crate::{ offset::OffsetsBuffer, }; -use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray}; - mod ffi; pub(super) mod fmt; mod iterator; #[allow(unused)] pub use iterator::*; + use crate::datatypes::PhysicalType; /// An array representing a (key, value), both of arbitrary logical types. @@ -52,7 +52,10 @@ impl MapArray { let inner_len = inner.len(); if inner_len != 2 { - let msg = format!("MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields", inner_len); + let msg = format!( + "MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields", + inner_len + ); return Err(Error::InvalidArgumentError(msg)); } @@ -227,15 +230,15 @@ impl Array for MapArray { panic!("inner types are not equal"); } + let mut field = self.field.clone(); + field.change_type(target_inner.data_type.clone()); + let offsets = self.offsets().clone(); let offsets = offsets.map(|offset| offset as i64); - let list = ListArray::new( - target, - offsets, - self.field.clone(), - self.validity.clone(), - ); + let debug = format!("{:#?}", field.data_type()); + let target_debug = format!("{:#?}", target); + let list = ListArray::new(target, offsets, field, self.validity.clone()); Box::new(list) } diff --git a/src/arrow2/src/array/mod.rs b/src/arrow2/src/array/mod.rs index 86910c4d54..caee7d9cf3 100644 --- a/src/arrow2/src/array/mod.rs +++ b/src/arrow2/src/array/mod.rs @@ -55,6 +55,11 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// When the validity is [`None`], all slots are valid. fn validity(&self) -> Option<&Bitmap>; + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + let dbg = format!("{:#?}", self.data_type()); + Box::new(core::iter::empty()) + } + /// The number of null slots on this [`Array`]. /// # Implementation /// This is `O(1)` since the number of null elements is pre-computed. @@ -185,7 +190,6 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`. #[tracing::instrument(level = "trace", name = "Array::convert_logical_type", skip_all)] fn convert_logical_type(&self, data_type: DataType) -> Box { - tracing::trace!("converting logical type to\n{data_type:#?}"); let mut new = self.to_boxed(); new.change_type(data_type); new @@ -643,8 +647,19 @@ macro_rules! impl_common_array { data_type.to_physical_type(), ); } - self.data_type = data_type; + + self.data_type = data_type.clone(); + + let mut children = self.direct_children(); + + data_type.direct_children(|child| { + let Some(child_elem) = children.next() else { + return; + }; + child_elem.change_type(child.clone()); + }) } + }; } @@ -739,7 +754,7 @@ pub(crate) use self::ffi::ToFfi; /// This is similar to [`Extend`], but accepted the creation to error. pub trait TryExtend { /// Fallible version of [`Extend::extend`]. - fn try_extend>(&mut self, iter: I) -> Result<()>; + fn try_extend>(&mut self, iter: I) -> Result<()>; } /// A trait describing the ability of a struct to receive new items. diff --git a/src/arrow2/src/array/struct_/mod.rs b/src/arrow2/src/array/struct_/mod.rs index fb2812375c..87d6f1991f 100644 --- a/src/arrow2/src/array/struct_/mod.rs +++ b/src/arrow2/src/array/struct_/mod.rs @@ -1,10 +1,11 @@ +use std::ops::DerefMut; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, error::Error, }; -use super::{new_empty_array, new_null_array, Array}; +use super::{new_empty_array, new_null_array, Array, ToFfi}; mod ffi; pub(super) mod fmt; @@ -246,6 +247,14 @@ impl StructArray { impl Array for StructArray { impl_common_array!(); + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + let iter = self.values + .iter_mut() + .map(|x| x.deref_mut()); + + Box::new(iter) + } + fn validity(&self) -> Option<&Bitmap> { self.validity.as_ref() } diff --git a/src/arrow2/src/datatypes/mod.rs b/src/arrow2/src/datatypes/mod.rs index 2debc5a4f2..4bfd1d7d3b 100644 --- a/src/arrow2/src/datatypes/mod.rs +++ b/src/arrow2/src/datatypes/mod.rs @@ -19,6 +19,10 @@ pub type Metadata = BTreeMap; /// typedef for [Option<(String, Option)>] descr pub(crate) type Extension = Option<(String, Option)>; +pub type ArrowDataType = DataType; +pub type ArrowField = Field; + + /// The set of supported logical types in this crate. /// /// Each variant uniquely identifies a logical type, which define specific semantics to the data @@ -159,6 +163,23 @@ pub enum DataType { Extension(String, Box, Option), } +impl DataType { + pub fn direct_children(&self, mut processor: impl FnMut(&DataType)) { + match self { + DataType::Null | DataType::Boolean | DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Decimal(..) | DataType::Decimal256(..) | + DataType::Int64 | DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | + DataType::Float16 | DataType::Float32 | DataType::Float64 | DataType::Timestamp(_, _) | + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) | + DataType::Duration(_) | DataType::Interval(_) | DataType::Binary | DataType::FixedSizeBinary(_) | + DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {} + DataType::List(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) | DataType::Map(field, ..) => processor(&field.data_type), + DataType::Struct(fields) | DataType::Union(fields, _, _) => fields.iter().for_each(|field| processor(&field.data_type)), + DataType::Dictionary(key_type, value_type, _) => todo!(), + DataType::Extension(_, inner, _) => processor(inner), + } + } +} + /// Mode of [`DataType::Union`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum UnionMode { diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index 2aed55f5e5..045bf0bc0f 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -18,7 +18,7 @@ pub mod prelude; use std::{marker::PhantomData, sync::Arc}; use common_error::{DaftError, DaftResult}; - +use daft_schema::field::DaftField; use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field}; #[derive(Debug)] @@ -45,20 +45,21 @@ where T: DaftPhysicalType, { pub fn new( - daft_field_with_physical: Arc, - arrow_data: Box, + physical_field: Arc, + arrow_array: Box, ) -> DaftResult { assert!( - daft_field_with_physical.dtype.is_physical(), + physical_field.dtype.is_physical(), "Can only construct DataArray for PhysicalTypes, got {}", - daft_field_with_physical.dtype + physical_field.dtype ); if let Ok(expected_arrow_physical_type) = - daft_field_with_physical.dtype.to_physical().to_arrow() + physical_field.dtype.to_arrow() // For instance Int32 -> Int 32 { - let maybe_arrow_logical_type = arrow_data.data_type(); // logical type - if !expected_arrow_physical_type.eq(maybe_arrow_logical_type) { + let arrow_data_type = arrow_array.data_type(); // logical type + + if !expected_arrow_physical_type.eq(arrow_data_type) { panic!( "Mismatch between expected and actual Arrow types for DataArray.\n\ Field name: {}\n\ @@ -66,21 +67,22 @@ where Physical type: {}\n\ Expected Arrow physical type: {:?}\n\ Actual Arrow Logical type: {:?}\n\ + This error typically occurs when there's a discrepancy between the Daft DataType \ and the underlying Arrow representation. Please ensure that the physical type \ of the Daft DataType matches the Arrow type of the provided data.", - daft_field_with_physical.name, - daft_field_with_physical.dtype, - daft_field_with_physical.dtype.to_physical(), + physical_field.name, + physical_field.dtype, + physical_field.dtype.to_physical(), expected_arrow_physical_type, - maybe_arrow_logical_type + arrow_data_type ); } } Ok(Self { - field: daft_field_with_physical, - data: arrow_data, + field: physical_field, + data: arrow_array, marker_: PhantomData, }) } diff --git a/src/daft-core/src/array/ops/from_arrow.rs b/src/daft-core/src/array/ops/from_arrow.rs index 57f49f4740..812d3a3c48 100644 --- a/src/daft-core/src/array/ops/from_arrow.rs +++ b/src/daft-core/src/array/ops/from_arrow.rs @@ -32,12 +32,18 @@ where { #[tracing::instrument(level = "trace", name = "LogicalArray::from_arrow", skip_all)] fn from_arrow(field: FieldRef, arrow_arr: Box) -> DaftResult { - info!("Converting to {field:?}"); - let data_array_field = Arc::new(Field::new(field.name.clone(), field.dtype.to_physical())); - let physical_arrow_arr = arrow_arr.convert_logical_type(data_array_field.dtype.to_arrow()?); + let target_convert = field.to_physical(); + let target_convert_arrow = target_convert.dtype.to_arrow()?; + + + let physical_arrow_array = arrow_arr.convert_logical_type(target_convert_arrow.clone()); + + let dbg = format!("Target Convert: {target_convert:#?}\nTarget Convert Arrow: {target_convert_arrow:#?}\nPhysical Arrow: {:#?}", physical_arrow_array.data_type()); + + let physical = ::ArrayType::from_arrow( - data_array_field, - physical_arrow_arr, + Arc::new(target_convert), + physical_arrow_array, )?; Ok(Self::new(field.clone(), physical)) } @@ -125,9 +131,11 @@ impl FromArrow for StructArray { match (&field.dtype, arrow_arr.data_type()) { (DataType::Struct(fields), arrow2::datatypes::DataType::Struct(arrow_fields)) => { if fields.len() != arrow_fields.len() { - return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))) + return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))); } + let debug = format!("{field:#?}"); + let arrow_arr = arrow_arr.as_ref().as_any().downcast_ref::().unwrap(); let arrow_child_arrays = arrow_arr.values(); @@ -140,7 +148,7 @@ impl FromArrow for StructArray { child_series, arrow_arr.validity().cloned(), )) - }, + } (d, a) => Err(DaftError::TypeError(format!("Attempting to create Daft StructArray with type {} from arrow array with type {:?}", d, a))) } } diff --git a/src/daft-core/src/datatypes/logical.rs b/src/daft-core/src/datatypes/logical.rs index e31784dcc5..86d84535e1 100644 --- a/src/daft-core/src/datatypes/logical.rs +++ b/src/daft-core/src/datatypes/logical.rs @@ -1,7 +1,6 @@ use std::{marker::PhantomData, sync::Arc}; use common_error::DaftResult; -use pyo3::append_to_inittab; use super::{ DaftArrayType, DaftDataType, DataArray, DataType, Decimal128Type, DurationType, EmbeddingType, diff --git a/src/daft-core/src/datatypes/matching.rs b/src/daft-core/src/datatypes/matching.rs index bae597393c..c275bb4a2d 100644 --- a/src/daft-core/src/datatypes/matching.rs +++ b/src/daft-core/src/datatypes/matching.rs @@ -8,43 +8,43 @@ macro_rules! with_match_daft_types {( use $crate::datatypes::*; match $key_type { - Null => __with_ty__! { NullType }, + // Float16 => unimplemented!("Array for Float16 DataType not implemented"), + Binary => __with_ty__! { BinaryType }, Boolean => __with_ty__! { BooleanType }, - Int8 => __with_ty__! { Int8Type }, + Date => __with_ty__! { DateType }, + Decimal128(..) => __with_ty__! { Decimal128Type }, + Duration(_) => __with_ty__! { DurationType }, + Embedding(..) => __with_ty__! { EmbeddingType }, + Extension(_, _, _) => __with_ty__! { ExtensionType }, + FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, + FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, + FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, + FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, + Float32 => __with_ty__! { Float32Type }, + Float64 => __with_ty__! { Float64Type }, + Image(..) => __with_ty__! { ImageType }, + Int128 => __with_ty__! { Int128Type }, Int16 => __with_ty__! { Int16Type }, Int32 => __with_ty__! { Int32Type }, Int64 => __with_ty__! { Int64Type }, - Int128 => __with_ty__! { Int128Type }, - UInt8 => __with_ty__! { UInt8Type }, + Int8 => __with_ty__! { Int8Type }, + List(_) => __with_ty__! { ListType }, + Map{..} => __with_ty__! { MapType }, + Null => __with_ty__! { NullType }, + SparseTensor(..) => __with_ty__! { SparseTensorType }, + Struct(_) => __with_ty__! { StructType }, + Tensor(..) => __with_ty__! { TensorType }, + Time(_) => __with_ty__! { TimeType }, + Timestamp(_, _) => __with_ty__! { TimestampType }, UInt16 => __with_ty__! { UInt16Type }, UInt32 => __with_ty__! { UInt32Type }, UInt64 => __with_ty__! { UInt64Type }, - Float32 => __with_ty__! { Float32Type }, - Float64 => __with_ty__! { Float64Type }, - Timestamp(_, _) => __with_ty__! { TimestampType }, - Date => __with_ty__! { DateType }, - Time(_) => __with_ty__! { TimeType }, - Duration(_) => __with_ty__! { DurationType }, - Binary => __with_ty__! { BinaryType }, - FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + UInt8 => __with_ty__! { UInt8Type }, + Unknown => unimplemented!("Array for Unknown DataType not implemented"), Utf8 => __with_ty__! { Utf8Type }, - FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, - List(_) => __with_ty__! { ListType }, - Struct(_) => __with_ty__! { StructType }, - Map{..} => __with_ty__! { MapType }, - Extension(_, _, _) => __with_ty__! { ExtensionType }, #[cfg(feature = "python")] Python => __with_ty__! { PythonType }, - Embedding(..) => __with_ty__! { EmbeddingType }, - Image(..) => __with_ty__! { ImageType }, - FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, - Tensor(..) => __with_ty__! { TensorType }, - FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, - SparseTensor(..) => __with_ty__! { SparseTensorType }, - FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, - Decimal128(..) => __with_ty__! { Decimal128Type }, - // Float16 => unimplemented!("Array for Float16 DataType not implemented"), - Unknown => unimplemented!("Array for Unknown DataType not implemented"), // NOTE: We should not implement a default for match here, because this is meant to be // an exhaustive match across **all** Daft types. diff --git a/src/daft-core/src/series/from.rs b/src/daft-core/src/series/from.rs index 3cf0891bad..fcf9084445 100644 --- a/src/daft-core/src/series/from.rs +++ b/src/daft-core/src/series/from.rs @@ -1,6 +1,4 @@ use std::sync::Arc; -use tracing::instrument; -use common_error::{DaftError, DaftResult}; use super::Series; use crate::{ @@ -9,10 +7,15 @@ use crate::{ series::array_impl::IntoSeries, with_match_daft_types, }; +use arrow2::datatypes::ArrowDataType; +use common_error::{DaftError, DaftResult}; +use tracing::instrument; +use daft_schema::dtype::DaftDataType; +use daft_schema::field::DaftField; impl Series { pub fn try_from_field_and_arrow_array( - field: impl Into>, + field: impl Into>, array: Box, ) -> DaftResult { let field = field.into(); @@ -48,11 +51,81 @@ impl TryFrom<(&str, Box)> for Series { type Error = DaftError; #[instrument(level = "trace", name = "Series::try_from", skip_all)] - fn try_from(item: (&str, Box)) -> DaftResult { - let (name, array) = item; - let source_arrow_type = array.data_type(); - let dtype: DataType = source_arrow_type.into(); + fn try_from((name, array): (&str, Box)) -> DaftResult { + let source_arrow_type: &ArrowDataType = array.data_type(); + let dtype = DaftDataType::from(source_arrow_type); let field = Arc::new(Field::new(name, dtype.clone())); Self::try_from_field_and_arrow_array(field, array) } } + +#[cfg(test)] +mod tests { + use arrow2::datatypes::{ArrowDataType, ArrowField}; + use daft_schema::dtype::DataType; + use std::sync::LazyLock; + use arrow2::array::Array; + use common_error::DaftResult; + + static ARROW_DATA_TYPE: LazyLock = LazyLock::new(|| { + ArrowDataType::Map( + Box::new(ArrowField::new( + "entries", + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + false, + )), + false, + ) + }); + + #[test] + fn test_map_type_conversion() { + let arrow_data_type = ARROW_DATA_TYPE.clone(); + let dtype = DataType::from(&arrow_data_type); + assert_eq!( + dtype, + DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + }, + ) + } + + #[test] + fn test_() -> DaftResult<()> { + use super::*; + use arrow2::array::MapArray; + + let arrow_array = MapArray::new( + ARROW_DATA_TYPE.clone(), + vec![0, 1].try_into().unwrap(), + Box::new(arrow2::array::StructArray::new( + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + vec![ + Box::new(arrow2::array::Utf8Array::::from_slice(["key1"])), + Box::new(arrow2::array::Int32Array::from_slice([1])).convert_logical_type(ArrowDataType::Date32), + ], + None, + )), + None, + ); + + let series = Series::try_from(("test_map", Box::new(arrow_array) as Box))?; + + assert_eq!( + series.data_type(), + &DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + } + ); + + Ok(()) + } +} diff --git a/src/daft-schema/src/dtype.rs b/src/daft-schema/src/dtype.rs index eed13486c2..e655e1ade2 100644 --- a/src/daft-schema/src/dtype.rs +++ b/src/daft-schema/src/dtype.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; use crate::{field::Field, image_mode::ImageMode, time_unit::TimeUnit}; +pub type DaftDataType = DataType; + #[derive(Clone, Debug, Display, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum DataType { // ArrowTypes: diff --git a/src/daft-schema/src/field.rs b/src/daft-schema/src/field.rs index 7fa4fdeb6e..f4cd6ecb16 100644 --- a/src/daft-schema/src/field.rs +++ b/src/daft-schema/src/field.rs @@ -18,6 +18,7 @@ pub struct Field { } pub type FieldRef = Arc; +pub type DaftField = Field; #[derive(Clone, Display, Debug, PartialEq, Eq, Deserialize, Serialize, Hash)] #[display("{id}")] diff --git a/src/daft-table/src/ffi.rs b/src/daft-table/src/ffi.rs index 37a118c50e..424ea516f2 100644 --- a/src/daft-table/src/ffi.rs +++ b/src/daft-table/src/ffi.rs @@ -42,9 +42,9 @@ pub fn record_batches_to_table( let columns = cols .into_iter() .enumerate() - .map(|(i, c)| { - let c = cast_array_for_daft_if_needed(c); - Series::try_from((names.get(i).unwrap().as_str(), c)) + .map(|(i, array)| { + let cast_array = cast_array_for_daft_if_needed(array); + Series::try_from((names.get(i).unwrap().as_str(), cast_array)) }) .collect::>>()?; tables.push(Table::new_with_size(schema.clone(), columns, num_rows)?)