diff --git a/benchmarking/tpch/data_generation.py b/benchmarking/tpch/data_generation.py index 84a7adca1f..e70170b9d2 100644 --- a/benchmarking/tpch/data_generation.py +++ b/benchmarking/tpch/data_generation.py @@ -315,20 +315,24 @@ def gen_parquet(csv_files_location: str) -> str: table_parquet_path = os.path.join(PARQUET_FILE_PATH, tab_name) if not os.path.exists(table_parquet_path): logger.info("Generating Parquet Files for %s", tab_name) - df = daft.read_csv( - os.path.join(csv_files_location, f"{tab_name}.tbl*"), - has_headers=False, - delimiter="|", - ).exclude("") - df = df.select( - *[ - df[autogen_col_name].alias(actual_col_name) - for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names) - ] - ) - df = df.write_parquet(table_parquet_path) - df.collect() - assert os.path.exists(table_parquet_path), f"Parquet files not generated by Daft at {table_parquet_path}" + files = glob(os.path.join(csv_files_location, f"{tab_name}.tbl*")) + for file in files: + df = daft.read_csv( + file, + has_headers=False, + delimiter="|", + ).exclude("") + df = df.select( + *[ + df[autogen_col_name].alias(actual_col_name) + for actual_col_name, autogen_col_name in zip(SCHEMA[tab_name], df.column_names) + ] + ) + df = df.write_parquet(table_parquet_path) + df.collect() + assert os.path.exists( + table_parquet_path + ), f"Parquet files not generated by Daft at {table_parquet_path}" else: logger.info( "Cached Parquet files for table %s already exists at %s, skipping Parquet file generation",