Skip to content

Commit

Permalink
[BUG] [New Query Planner] Revert file info partition column names. (#…
Browse files Browse the repository at this point in the history
…1333)

This PR reverts back to the original column names for the file info
partition, since these are user-facing and we're not wanting to break
backwards compatibility at the moment.
  • Loading branch information
clarkzinzow authored Sep 1, 2023
1 parent e7d7e81 commit 6fc90da
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 25 deletions.
2 changes: 1 addition & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def _handle_tabular_files_scan(
filepaths_partition: Table,
) -> Table:
data = filepaths_partition.to_pydict()
filepaths = data["file_paths"]
filepaths = data["path"]

if self.index is not None:
filepaths = [filepaths[self.index]]
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def file_read(

vpartition = done_task.vpartition()
file_infos = vpartition.to_pydict()
file_sizes_bytes = file_infos["file_sizes"]
file_sizes_bytes = file_infos["size"]
file_rows = file_infos["num_rows"]

# Emit one partition for each file (NOTE: hardcoded for now).
Expand Down
2 changes: 1 addition & 1 deletion daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def __repr__(self) -> str:
)

def required_columns(self) -> list[set[str]]:
return [{"file_paths"} | self._predicate.required_columns()]
return [{"path"} | self._predicate.required_columns()]

def input_mapping(self) -> list[dict[str, str]]:
return [dict()]
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl LogicalPlanBuilder {
part_cols,
compression,
));
let fields = vec![Field::new("file_paths", DataType::Utf8)];
let fields = vec![Field::new("path", DataType::Utf8)];
let logical_plan: LogicalPlan = ops::Sink::new(
Schema::new(fields)?.into(),
sink_info.into(),
Expand Down
8 changes: 4 additions & 4 deletions src/daft-plan/src/source_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl FileInfos {

pub fn from_table_internal(table: Table) -> DaftResult<Self> {
let file_paths = table
.get_column("file_paths")?
.get_column("path")?
.utf8()?
.data()
.as_any()
Expand All @@ -291,7 +291,7 @@ impl FileInfos {
.map(|s| s.unwrap().to_string())
.collect::<Vec<_>>();
let file_sizes = table
.get_column("file_sizes")?
.get_column("size")?
.i64()?
.data()
.as_any()
Expand Down Expand Up @@ -324,12 +324,12 @@ impl FileInfos {
pub fn to_table_internal(&self) -> DaftResult<Table> {
let columns = vec![
Series::try_from((
"file_paths",
"path",
arrow2::array::Utf8Array::<i64>::from_iter_values(self.file_paths.iter())
.to_boxed(),
))?,
Series::try_from((
"file_sizes",
"size",
arrow2::array::PrimitiveArray::<i64>::from(&self.file_sizes).to_boxed(),
))?,
Series::try_from((
Expand Down
30 changes: 14 additions & 16 deletions tests/cookbook/test_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,21 @@ def test_glob_files(tmpdir):
daft_df = daft.from_glob_path(f"{tmpdir}/*.foo")
daft_pd_df = daft_df.to_pandas()
pd_df = pd.DataFrame.from_records(
{"file_paths": str(path), "file_sizes": size, "num_rows": None}
for path, size in zip(filepaths, list(range(10)))
{"path": str(path), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10)))
)
pd_df = pd_df[~pd_df["file_paths"].str.endswith(".bar")]
pd_df = pd_df[~pd_df["path"].str.endswith(".bar")]
pd_df = pd_df.astype({"num_rows": float})
assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths")
assert_df_equals(daft_pd_df, pd_df, sort_key="path")


def test_glob_files_single_file(tmpdir):
filepath = pathlib.Path(tmpdir) / f"file.foo"
filepath.write_text("b" * 10)
daft_df = daft.from_glob_path(f"{tmpdir}/file.foo")
daft_pd_df = daft_df.to_pandas()
pd_df = pd.DataFrame.from_records([{"file_paths": str(filepath), "file_sizes": 10, "num_rows": None}])
pd_df = pd.DataFrame.from_records([{"path": str(filepath), "size": 10, "num_rows": None}])
pd_df = pd_df.astype({"num_rows": float})
assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths")
assert_df_equals(daft_pd_df, pd_df, sort_key="path")


def test_glob_files_directory(tmpdir):
Expand All @@ -116,16 +115,16 @@ def test_glob_files_directory(tmpdir):
daft_pd_df = daft_df.to_pandas()

listing_records = [
{"file_paths": str(path), "file_sizes": size, "num_rows": None}
{"path": str(path), "size": size, "num_rows": None}
for path, size in zip(filepaths, [i for i in range(10) for _ in range(2)])
]
listing_records = listing_records + [
{"file_paths": str(extra_empty_dir), "file_sizes": extra_empty_dir.stat().st_size, "num_rows": None}
{"path": str(extra_empty_dir), "size": extra_empty_dir.stat().st_size, "num_rows": None}
]
pd_df = pd.DataFrame.from_records(listing_records)
pd_df = pd_df.astype({"num_rows": float})

assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths")
assert_df_equals(daft_pd_df, pd_df, sort_key="path")


def test_glob_files_recursive(tmpdir):
Expand All @@ -142,16 +141,16 @@ def test_glob_files_recursive(tmpdir):
daft_pd_df = daft_df.to_pandas()

listing_records = [
{"file_paths": str(path), "file_sizes": size, "num_rows": None}
{"path": str(path), "size": size, "num_rows": None}
for path, size in zip(paths, [i for i in range(10) for _ in range(2)])
]
listing_records = listing_records + [
{"file_paths": str(nested_dir_path), "file_sizes": nested_dir_path.stat().st_size, "num_rows": None}
{"path": str(nested_dir_path), "size": nested_dir_path.stat().st_size, "num_rows": None}
]
pd_df = pd.DataFrame.from_records(listing_records)
pd_df = pd_df.astype({"num_rows": float})

assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths")
assert_df_equals(daft_pd_df, pd_df, sort_key="path")


@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use")
Expand All @@ -176,9 +175,8 @@ def test_glob_files_custom_fs(tmpdir):

daft_pd_df = daft_df.to_pandas()
pd_df = pd.DataFrame.from_records(
{"file_paths": str(path), "file_sizes": size, "num_rows": None}
for path, size in zip(filepaths, list(range(10)))
{"path": str(path), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10)))
)
pd_df = pd_df[~pd_df["file_paths"].str.endswith(".bar")]
pd_df = pd_df[~pd_df["path"].str.endswith(".bar")]
pd_df = pd_df.astype({"num_rows": float})
assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths")
assert_df_equals(daft_pd_df, pd_df, sort_key="path")
2 changes: 1 addition & 1 deletion tests/cookbook/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_image_decode() -> None:
df = (
daft.from_glob_path(f"{ASSET_FOLDER}/images/**")
.into_partitions(2)
.with_column("image", col("file_paths").url.download().image.decode().image.resize(10, 10))
.with_column("image", col("path").url.download().image.decode().image.resize(10, 10))
)
target_dtype = DataType.image()
assert df.schema()["image"].dtype == target_dtype
Expand Down

0 comments on commit 6fc90da

Please sign in to comment.