Skip to content

Commit

Permalink
write parquet files one at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Nov 21, 2024
1 parent 3394a66 commit 85fd788
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions benchmarking/tpch/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,24 @@ def gen_parquet(csv_files_location: str) -> str:
table_parquet_path = os.path.join(PARQUET_FILE_PATH, tab_name)
if not os.path.exists(table_parquet_path):
logger.info("Generating Parquet Files for %s", tab_name)
df = daft.read_csv(
os.path.join(csv_files_location, f"{tab_name}.tbl*"),
has_headers=False,
delimiter="|",
).exclude("")
df = df.select(
*[
df[autogen_col_name].alias(actual_col_name)
for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names)
]
)
df = df.write_parquet(table_parquet_path)
df.collect()
assert os.path.exists(table_parquet_path), f"Parquet files not generated by Daft at {table_parquet_path}"
files = glob(os.path.join(csv_files_location, f"{tab_name}.tbl*"))
for file in files:
df = daft.read_csv(
file,
has_headers=False,
delimiter="|",
).exclude("")
df = df.select(
*[
df[autogen_col_name].alias(actual_col_name)
for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names)
]
)
df = df.write_parquet(table_parquet_path)
df.collect()
assert os.path.exists(
table_parquet_path
), f"Parquet files not generated by Daft at {table_parquet_path}"
else:
logger.info(
"Cached Parquet files for table %s already exists at %s, skipping Parquet file generation",
Expand Down

0 comments on commit 85fd788

Please sign in to comment.