Skip to content

Commit

Permalink
[BUG] Fix writes for empty dataframes if target directory does not ex…
Browse files Browse the repository at this point in the history
…ist (#3278)

Writes for empty dataframes currently error out if the target directory
does not exist. This happens in the `overwrite_files` method when
looking for files to delete, and also in `write_empty_tabular` when
trying to write a file in a non-existent directory.

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Nov 13, 2024
1 parent 0720ffc commit c4e1ab2
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 8 deletions.
7 changes: 6 additions & 1 deletion daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ def overwrite_files(
) -> None:
[resolved_path], fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
file_selector = pafs.FileSelector(resolved_path, recursive=True)
paths = [info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
try:
paths = [info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": paths})

assert manifest._result is not None
Expand Down
7 changes: 6 additions & 1 deletion daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,10 @@ def write_empty_tabular(
table = pa.Table.from_pylist([], schema=schema.to_pyarrow_schema())

[resolved_path], fs = _resolve_paths_and_filesystem(path, io_config=io_config)
is_local_fs = canonicalize_protocol(get_protocol_from_path(path if isinstance(path, str) else str(path))) == "file"
if is_local_fs:
fs.create_dir(resolved_path, recursive=True)

basename_template = _generate_basename_template(file_format.ext())
file_path = f"{resolved_path}/{basename_template.format(i=0)}"

Expand All @@ -792,7 +796,8 @@ def write_table():
filesystem=fs,
)
elif file_format == FileFormat.Csv:
pacsv.write_csv(table, file_path)
output_file = fs.open_output_stream(file_path)
pacsv.write_csv(table, output_file)
else:
raise ValueError(f"Unsupported file format {file_format}")

Expand Down
20 changes: 14 additions & 6 deletions tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import os
import uuid
from datetime import datetime

import pyarrow as pa
Expand Down Expand Up @@ -40,24 +42,30 @@ def test_parquet_write_with_partitioning(tmp_path, with_morsel_size):
assert len(pd_df._preview.preview_partition) == 5


def test_empty_parquet_write_without_partitioning(tmp_path, with_morsel_size):
@pytest.mark.parametrize("write_mode", ["append", "overwrite"])
def test_empty_parquet_write_without_partitioning(tmp_path, write_mode, with_morsel_size):
df = daft.read_csv(COOKBOOK_DATA_CSV)
df = df.where(daft.lit(False))

pd_df = df.write_parquet(tmp_path)
read_back_pd_df = daft.read_parquet(tmp_path.as_posix() + "/*.parquet").to_pandas()
# Create a unique path to make sure that the writer is comfortable with nonexistent directories
path = os.path.join(tmp_path, str(uuid.uuid4()))
pd_df = df.write_parquet(path, write_mode=write_mode)
read_back_pd_df = daft.read_parquet(path).to_pandas()
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 1
assert len(pd_df._preview.preview_partition) == 1


def test_empty_parquet_write_with_partitioning(tmp_path, with_morsel_size):
@pytest.mark.parametrize("write_mode", ["append", "overwrite"])
def test_empty_parquet_write_with_partitioning(tmp_path, write_mode, with_morsel_size):
df = daft.read_csv(COOKBOOK_DATA_CSV)
df = df.where(daft.lit(False))

output_files = df.write_parquet(tmp_path, partition_cols=["Borough"])
read_back_pd_df = daft.read_parquet(tmp_path.as_posix() + "/**/*.parquet").to_pandas()
# Create a unique path to make sure that the writer is comfortable with nonexistent directories
path = os.path.join(tmp_path, str(uuid.uuid4()))
output_files = df.write_parquet(path, partition_cols=["Borough"], write_mode=write_mode)
read_back_pd_df = daft.read_parquet(os.path.join(path, "**/*.parquet")).to_pandas()
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(output_files) == 1
Expand Down
72 changes: 72 additions & 0 deletions tests/io/test_write_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,39 @@ def test_write_modes_local(tmp_path, write_mode, format, num_partitions, partiti
raise ValueError(f"Unsupported write_mode: {write_mode}")


@pytest.mark.parametrize("write_mode", ["append", "overwrite"])
@pytest.mark.parametrize("format", ["csv", "parquet"])
def test_write_modes_local_empty_data(tmp_path, write_mode, format):
path = str(tmp_path)
existing_data = {"a": ["a", "a", "b", "b"], "b": ["c", "d", "e", "f"]}
new_data = {
"a": ["a", "a", "b", "b"],
"b": ["g", "h", "i", "j"],
}

read_back = arrange_write_mode_test(
daft.from_pydict(existing_data),
daft.from_pydict(new_data).where(daft.lit(False)), # Empty data
path,
format,
write_mode,
None,
None,
)

# Check the data
if write_mode == "append":
# The data should be the same as the existing data
assert read_back["a"] == ["a", "a", "b", "b"]
assert read_back["b"] == ["c", "d", "e", "f"]
elif write_mode == "overwrite":
# The data should be empty because we are overwriting the existing data
assert read_back["a"] == []
assert read_back["b"] == []
else:
raise ValueError(f"Unsupported write_mode: {write_mode}")


@pytest.fixture(scope="function")
def bucket(minio_io_config):
BUCKET = "write-modes-bucket"
Expand Down Expand Up @@ -142,3 +175,42 @@ def test_write_modes_s3_minio(
assert read_back["b"] == [5, 6, 7, 8]
else:
raise ValueError(f"Unsupported write_mode: {write_mode}")


@pytest.mark.integration()
@pytest.mark.parametrize("write_mode", ["append", "overwrite"])
@pytest.mark.parametrize("format", ["csv", "parquet"])
def test_write_modes_s3_minio_empty_data(
minio_io_config,
bucket,
write_mode,
format,
):
path = f"s3://{bucket}/{str(uuid.uuid4())}"
existing_data = {"a": ["a", "a", "b", "b"], "b": ["c", "d", "e", "f"]}
new_data = {
"a": ["a", "a", "b", "b"],
"b": ["g", "h", "i", "j"],
}

read_back = arrange_write_mode_test(
daft.from_pydict(existing_data),
daft.from_pydict(new_data).where(daft.lit(False)), # Empty data
path,
format,
write_mode,
None,
minio_io_config,
)

# Check the data
if write_mode == "append":
# The data should be the same as the existing data
assert read_back["a"] == ["a", "a", "b", "b"]
assert read_back["b"] == ["c", "d", "e", "f"]
elif write_mode == "overwrite":
# The data should be empty because we are overwriting the existing data
assert read_back["a"] == []
assert read_back["b"] == []
else:
raise ValueError(f"Unsupported write_mode: {write_mode}")

0 comments on commit c4e1ab2

Please sign in to comment.