From 9be424aeff32ff6d46b256caa636e635383ab01f Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Wed, 21 Feb 2024 15:06:04 -0800 Subject: [PATCH] Fixes. --- daft/delta_lake/delta_lake_scan.py | 3 ++- tests/io/delta_lake/conftest.py | 3 +++ tests/io/delta_lake/test_table_read.py | 10 +++++++--- tests/io/delta_lake/test_table_read_pushdowns.py | 11 +++++------ 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/daft/delta_lake/delta_lake_scan.py b/daft/delta_lake/delta_lake_scan.py index 2803ead9ef..740497b1bb 100644 --- a/daft/delta_lake/delta_lake_scan.py +++ b/daft/delta_lake/delta_lake_scan.py @@ -6,7 +6,6 @@ from typing import Any from urllib.parse import urlparse -import pyarrow as pa from deltalake.table import DeltaTable import daft @@ -58,6 +57,8 @@ def multiline_display(self) -> list[str]: ] def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: + import pyarrow as pa + # TODO(Clark): Push limit and filter expressions into deltalake action fetch, to prune the files returned. add_actions: pa.RecordBatch = self._table.get_add_actions() diff --git a/tests/io/delta_lake/conftest.py b/tests/io/delta_lake/conftest.py index 849d7b0ac3..ff47bd9db0 100644 --- a/tests/io/delta_lake/conftest.py +++ b/tests/io/delta_lake/conftest.py @@ -25,6 +25,9 @@ def local_deltalake_table(request, tmp_path, partition_by) -> deltalake.DeltaTab "e": [datetime.datetime(2024, 2, 10), datetime.datetime(2024, 2, 11), datetime.datetime(2024, 2, 12)], } ) + # Delta Lake casts timestamps to microsecond resolution on ingress, so we preemptively cast the Pandas DataFrame here + # to make equality assertions easier later. + base_df["e"] = base_df["e"].astype("datetime64[us]") dfs = [] for part_idx in range(request.param): part_df = base_df.copy() diff --git a/tests/io/delta_lake/test_table_read.py b/tests/io/delta_lake/test_table_read.py index 9451b6b9e3..b3715fe3b6 100644 --- a/tests/io/delta_lake/test_table_read.py +++ b/tests/io/delta_lake/test_table_read.py @@ -7,11 +7,15 @@ deltalake = pytest.importorskip("deltalake") +import pyarrow as pa + import daft from daft.logical.schema import Schema +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) +pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="deltalake only supported if pyarrow >= 8.0.0") + -@pytest.mark.integration() def test_deltalake_read_basic(tmp_path): pd_df = pd.DataFrame( { @@ -24,10 +28,11 @@ def test_deltalake_read_basic(tmp_path): deltalake.write_deltalake(path, pd_df) df = daft.read_delta_lake(str(path)) assert df.schema() == Schema.from_pyarrow_schema(deltalake.DeltaTable(path).schema().to_pyarrow()) + # Delta Lake casts timestamps to microsecond resolution on ingress. + pd_df["c"] = pd_df["c"].astype("datetime64[us]") pd.testing.assert_frame_equal(df.to_pandas(), pd_df) -@pytest.mark.integration() def test_deltalake_read_full(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -35,7 +40,6 @@ def test_deltalake_read_full(local_deltalake_table): pd.testing.assert_frame_equal(df.to_pandas(), pd.concat(dfs).reset_index(drop=True)) -@pytest.mark.integration() def test_deltalake_read_show(local_deltalake_table): path, _ = local_deltalake_table df = daft.read_delta_lake(str(path)) diff --git a/tests/io/delta_lake/test_table_read_pushdowns.py b/tests/io/delta_lake/test_table_read_pushdowns.py index 2df649caea..b70d136805 100644 --- a/tests/io/delta_lake/test_table_read_pushdowns.py +++ b/tests/io/delta_lake/test_table_read_pushdowns.py @@ -7,11 +7,15 @@ deltalake = pytest.importorskip("deltalake") +import pyarrow as pa + import daft from daft.logical.schema import Schema +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) +pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="deltalake only supported if pyarrow >= 8.0.0") + -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_data(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -22,7 +26,6 @@ def test_deltalake_read_predicate_pushdown_on_data(local_deltalake_table): ) -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_part(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -33,7 +36,6 @@ def test_deltalake_read_predicate_pushdown_on_part(local_deltalake_table): ) -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_part_non_eq(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -44,7 +46,6 @@ def test_deltalake_read_predicate_pushdown_on_part_non_eq(local_deltalake_table) ) -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_part_and_data(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -58,7 +59,6 @@ def test_deltalake_read_predicate_pushdown_on_part_and_data(local_deltalake_tabl ) -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_part_and_data_same_clause(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path)) @@ -70,7 +70,6 @@ def test_deltalake_read_predicate_pushdown_on_part_and_data_same_clause(local_de ) -@pytest.mark.integration() def test_deltalake_read_predicate_pushdown_on_part_empty(local_deltalake_table): path, dfs = local_deltalake_table df = daft.read_delta_lake(str(path))