Skip to content

Commit

Permalink
add anon mode
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 6, 2023
1 parent 003d35b commit ddbb050
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 46 deletions.
65 changes: 65 additions & 0 deletions tests/integration/io/parquet/test_read_pushdowns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from itertools import product

import pytest

import daft
from daft.table import MicroPartition

PRED_PUSHDOWN_FILES = [
"s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet",
"tests/assets/parquet-data/sampled-tpch-with-stats.parquet",
]


@pytest.mark.integration()
@pytest.mark.parametrize(
"path, pred, limit",
product(
PRED_PUSHDOWN_FILES,
[daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)],
[None, 1, 1000],
),
)
def test_parquet_filter_pushdowns(path, pred, limit, aws_public_s3_config):
with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit, io_config=aws_public_s3_config)
after = MicroPartition.read_parquet(path, io_config=aws_public_s3_config).filter([pred])
if limit is not None:
after = after.head(limit)
assert with_pushdown.to_arrow() == after.to_arrow()


@pytest.mark.integration()
@pytest.mark.parametrize(
"path, pred",
product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]),
)
def test_parquet_filter_pushdowns_disjoint_predicate(path, pred, aws_public_s3_config):
with_pushdown = MicroPartition.read_parquet(
path, predicate=pred, columns=["L_QUANTITY"], io_config=aws_public_s3_config
)
after = (
MicroPartition.read_parquet(path, io_config=aws_public_s3_config)
.filter([pred])
.eval_expression_list([daft.col("L_QUANTITY")])
)
assert with_pushdown.to_arrow() == after.to_arrow()


@pytest.mark.integration()
@pytest.mark.parametrize(
"path, pred",
product(
["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"],
[daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)],
),
)
def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred, aws_public_s3_config):
with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"], io_config=aws_public_s3_config)
after = (
MicroPartition.read_parquet(path, io_config=aws_public_s3_config)
.filter([pred])
.eval_expression_list([daft.col("b")])
)
assert with_pushdown.to_arrow() == after.to_arrow()
46 changes: 0 additions & 46 deletions tests/table/table_io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import pathlib
import tempfile
from itertools import product

import pyarrow as pa
import pyarrow.parquet as papq
Expand Down Expand Up @@ -366,48 +365,3 @@ def test_read_empty_parquet_file_with_pyarrow_bulk(tmpdir):
read_back = read_parquet_into_pyarrow_bulk([file_path.as_posix()])
assert len(read_back) == 1
assert tab == read_back[0]


PRED_PUSHDOWN_FILES = [
"s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet",
"tests/assets/parquet-data/sampled-tpch-with-stats.parquet",
]


@pytest.mark.parametrize(
"path, pred, limit",
product(
PRED_PUSHDOWN_FILES,
[daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)],
[None, 1, 1000],
),
)
def test_parquet_filter_pushdowns(path, pred, limit):
with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit)
after = MicroPartition.read_parquet(path).filter([pred])
if limit is not None:
after = after.head(limit)
assert with_pushdown.to_arrow() == after.to_arrow()


@pytest.mark.parametrize(
"path, pred",
product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]),
)
def test_parquet_filter_pushdowns_disjoint_predicate(path, pred):
with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["L_QUANTITY"])
after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("L_QUANTITY")])
assert with_pushdown.to_arrow() == after.to_arrow()


@pytest.mark.parametrize(
"path, pred",
product(
["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"],
[daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)],
),
)
def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred):
with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"])
after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("b")])
assert with_pushdown.to_arrow() == after.to_arrow()

0 comments on commit ddbb050

Please sign in to comment.