Skip to content

Commit

Permalink
fix bug with scan task num rows
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 21, 2023
1 parent 7043715 commit ae7bcac
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
2 changes: 1 addition & 1 deletion daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def global_limit(
# since we will never take more than the remaining limit anyway.
child_plan = local_limit(child_plan=child_plan, limit=remaining_rows)
started = False

print(f"starting with remaining rows {remaining_rows}")
while True:
# Check if any inputs finished executing.
# Apply and deduct the rolling global limit.
Expand Down
3 changes: 2 additions & 1 deletion daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:

def _scan(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
assert len(inputs) == 0
return [MicroPartition._from_scan_task(self.scan_task)]
table = MicroPartition._from_scan_task(self.scan_task)
return [table]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 0
Expand Down
3 changes: 0 additions & 3 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def __init__(self, iceberg_table: Table, storage_config: StorageConfig) -> None:
arrow_schema = schema_to_pyarrow(iceberg_table.schema())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())
print(self._partition_keys)

def schema(self) -> Schema:
return self._schema
Expand Down Expand Up @@ -107,7 +106,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
iceberg_tasks = self._table.scan(limit=limit).plan_files()

limit_files = limit is not None and pushdowns.filters is None and pushdowns.partition_filters is None

scan_tasks = []

if limit is not None:
Expand Down Expand Up @@ -146,7 +144,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
continue
rows_left -= record_count
scan_tasks.append(st)
print()
return iter(scan_tasks)

def can_absorb_filter(self) -> bool:
Expand Down
6 changes: 5 additions & 1 deletion src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ impl ScanTask {
}

pub fn num_rows(&self) -> Option<usize> {
self.metadata.as_ref().map(|m| m.length)
if self.pushdowns.filters.is_some() {
None
} else {
self.metadata.as_ref().map(|m| m.length)
}
}

pub fn size_bytes(&self) -> Option<usize> {
Expand Down
37 changes: 35 additions & 2 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_daft_iceberg_table_open(local_iceberg_tables):
# "test_positional_mor_deletes", # Need Merge on Read
# "test_positional_mor_double_deletes", # Need Merge on Read
# "test_table_sanitized_character", # Bug in scan().to_arrow().to_arrow()
"test_table_version", # we have bugs when loading no files
"test_table_version", # we have bugs when loading no files
"test_uuid_and_fixed_unpartitioned",
]

Expand Down Expand Up @@ -111,7 +111,6 @@ def test_daft_iceberg_table_predicate_pushdown_on_date_column(predicate, table,
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])


#
@pytest.mark.integration()
@pytest.mark.parametrize(
"predicate, table, limit",
Expand Down Expand Up @@ -147,6 +146,40 @@ def test_daft_iceberg_table_predicate_pushdown_on_timestamp_column(predicate, ta
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])


@pytest.mark.integration()
@pytest.mark.parametrize(
"predicate, table, limit",
itertools.product(
[
lambda x: x < "d",
lambda x: x == "d",
lambda x: x > "d",
lambda x: x != "d",
lambda x: x == "z",
],
[
"test_partitioned_by_truncate",
],
[None, 1, 2, 1000],
),
)
def test_daft_iceberg_table_predicate_pushdown_on_letter(predicate, table, limit, local_iceberg_catalog):
tab = local_iceberg_catalog.load_table(f"default.{table}")
df = daft.read_iceberg(tab)
df = df.where(predicate(df["letter"]))
if limit:
df = df.limit(limit)
df.collect()

daft_pandas = df.to_pandas()
iceberg_pandas = tab.scan().to_arrow().to_pandas()
iceberg_pandas = iceberg_pandas[predicate(iceberg_pandas["letter"])]
if limit:
iceberg_pandas = iceberg_pandas[:limit]

assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])


@pytest.mark.integration()
def test_daft_iceberg_table_predicate_pushdown_empty_scan(local_iceberg_catalog):
tab = local_iceberg_catalog.load_table("default.test_partitioned_by_months")
Expand Down

0 comments on commit ae7bcac

Please sign in to comment.