Skip to content

Commit

Permalink
[CHORE] Write tpch parquet files one at a time (#3396)
Browse files Browse the repository at this point in the history
When you specify a `num_parts` parameter when generating tpch files. It
will first generate `num_parts` CSVs, then read those CSVs and write to
parquet using Daft.

However, `write_parquet` will not respect the input number of files,
e.g. even if there are 16 input files there might only be 1 output file.

The fix here is to read and write 1 file at a time.

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Dec 1, 2024
1 parent 8e85d44 commit 8652eba
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions benchmarking/tpch/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,24 @@ def gen_parquet(csv_files_location: str) -> str:
table_parquet_path = os.path.join(PARQUET_FILE_PATH, tab_name)
if not os.path.exists(table_parquet_path):
logger.info("Generating Parquet Files for %s", tab_name)
df = daft.read_csv(
os.path.join(csv_files_location, f"{tab_name}.tbl*"),
has_headers=False,
delimiter="|",
).exclude("")
df = df.select(
*[
df[autogen_col_name].alias(actual_col_name)
for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names)
]
)
df = df.write_parquet(table_parquet_path)
df.collect()
assert os.path.exists(table_parquet_path), f"Parquet files not generated by Daft at {table_parquet_path}"
files = glob(os.path.join(csv_files_location, f"{tab_name}.tbl*"))
for file in files:
df = daft.read_csv(
file,
has_headers=False,
delimiter="|",
).exclude("")
df = df.select(
*[
df[autogen_col_name].alias(actual_col_name)
for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names)
]
)
df = df.write_parquet(table_parquet_path)
df.collect()
assert os.path.exists(
table_parquet_path
), f"Parquet files not generated by Daft at {table_parquet_path}"
else:
logger.info(
"Cached Parquet files for table %s already exists at %s, skipping Parquet file generation",
Expand Down

0 comments on commit 8652eba

Please sign in to comment.