diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 40868673ec..1fb9b6d0b2 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -453,7 +453,7 @@ def global_limit( # since we will never take more than the remaining limit anyway. child_plan = local_limit(child_plan=child_plan, limit=remaining_rows) started = False - + print(f"starting with remaining rows {remaining_rows}") while True: # Check if any inputs finished executing. # Apply and deduct the rolling global limit. diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index e29e94c609..5d772bf49e 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -62,7 +62,8 @@ def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]: def _scan(self, inputs: list[MicroPartition]) -> list[MicroPartition]: assert len(inputs) == 0 - return [MicroPartition._from_scan_task(self.scan_task)] + table = MicroPartition._from_scan_task(self.scan_task) + return [table] def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]: assert len(input_metadatas) == 0 diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py index 307b435701..a06119a036 100644 --- a/daft/iceberg/iceberg_scan.py +++ b/daft/iceberg/iceberg_scan.py @@ -76,7 +76,6 @@ def __init__(self, iceberg_table: Table, storage_config: StorageConfig) -> None: arrow_schema = schema_to_pyarrow(iceberg_table.schema()) self._schema = Schema.from_pyarrow_schema(arrow_schema) self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec()) - print(self._partition_keys) def schema(self) -> Schema: return self._schema @@ -107,7 +106,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: iceberg_tasks = self._table.scan(limit=limit).plan_files() limit_files = limit is not None and pushdowns.filters is None and pushdowns.partition_filters is None - scan_tasks = [] if limit is not None: @@ -146,7 +144,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: continue rows_left -= record_count scan_tasks.append(st) - print() return iter(scan_tasks) def can_absorb_filter(self) -> bool: diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 570c9fec5a..b4b38f8be1 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -270,7 +270,11 @@ impl ScanTask { } pub fn num_rows(&self) -> Option { - self.metadata.as_ref().map(|m| m.length) + if self.pushdowns.filters.is_some() { + None + } else { + self.metadata.as_ref().map(|m| m.length) + } } pub fn size_bytes(&self) -> Option { diff --git a/tests/integration/iceberg/test_table_load.py b/tests/integration/iceberg/test_table_load.py index 8648e17e1a..16579b3adc 100644 --- a/tests/integration/iceberg/test_table_load.py +++ b/tests/integration/iceberg/test_table_load.py @@ -38,7 +38,7 @@ def test_daft_iceberg_table_open(local_iceberg_tables): # "test_positional_mor_deletes", # Need Merge on Read # "test_positional_mor_double_deletes", # Need Merge on Read # "test_table_sanitized_character", # Bug in scan().to_arrow().to_arrow() - "test_table_version", # we have bugs when loading no files + "test_table_version", # we have bugs when loading no files "test_uuid_and_fixed_unpartitioned", ] @@ -111,7 +111,6 @@ def test_daft_iceberg_table_predicate_pushdown_on_date_column(predicate, table, assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[]) -# @pytest.mark.integration() @pytest.mark.parametrize( "predicate, table, limit", @@ -147,6 +146,40 @@ def test_daft_iceberg_table_predicate_pushdown_on_timestamp_column(predicate, ta assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[]) +@pytest.mark.integration() +@pytest.mark.parametrize( + "predicate, table, limit", + itertools.product( + [ + lambda x: x < "d", + lambda x: x == "d", + lambda x: x > "d", + lambda x: x != "d", + lambda x: x == "z", + ], + [ + "test_partitioned_by_truncate", + ], + [None, 1, 2, 1000], + ), +) +def test_daft_iceberg_table_predicate_pushdown_on_letter(predicate, table, limit, local_iceberg_catalog): + tab = local_iceberg_catalog.load_table(f"default.{table}") + df = daft.read_iceberg(tab) + df = df.where(predicate(df["letter"])) + if limit: + df = df.limit(limit) + df.collect() + + daft_pandas = df.to_pandas() + iceberg_pandas = tab.scan().to_arrow().to_pandas() + iceberg_pandas = iceberg_pandas[predicate(iceberg_pandas["letter"])] + if limit: + iceberg_pandas = iceberg_pandas[:limit] + + assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[]) + + @pytest.mark.integration() def test_daft_iceberg_table_predicate_pushdown_empty_scan(local_iceberg_catalog): tab = local_iceberg_catalog.load_table("default.test_partitioned_by_months")