diff --git a/benchmarking/arrow.py b/benchmarking/arrow.py index e270200..80d68e2 100644 --- a/benchmarking/arrow.py +++ b/benchmarking/arrow.py @@ -1,9 +1,12 @@ import os import sys import time +import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import pyarrow.feather as ft +from pyarrow import RecordBatchFileReader as Reader +from pyarrow import RecordBatchFileWriter as Writer PATH = os.path.expanduser("~") sys.path.append(PATH + "/fog_x_fork") @@ -16,15 +19,15 @@ def convert_parquet(in_dir): table = pq.read_table(in_dir) file = in_dir.split("/")[-1].split("-")[0] - - with pa.ipc.new_file(f"{out_dir}/IPC/{file}.arrow", table.schema) as w: - w.write_table(table) with pa.OSFile(f"{out_dir}/REG/{file}.arrow", "wb") as sink: - with pa.RecordBatchFileWriter(sink, table.schema) as w: + with Writer(sink, table.schema) as w: w.write_table(table) - ft.write_feather(table, f"{out_dir}/fth/{file}.arrow", "uncompressed") + with pa.ipc.new_file(f"{out_dir}/IPC/{file}.arrow", table.schema) as w: + w.write_table(table) + + ft.write_feather(table, f"{out_dir}/FTH/{file}.feather", "uncompressed") N = 51 @@ -34,5 +37,53 @@ def convert_parquet(in_dir): for i in range(N): convert_parquet(f"{PATH}/{NAME}/{NAME}_{i}-0.parquet") + +def measure_traj(read_func, write_func, name): + read_time, write_time, data_size = 0, 0, 0 + + for i in range(N): + print(f"Measuring trajectory {i}") + + extn = "arrow" if (name[0] == "A") else "feather" + path = f"{out_dir}/{name[-3:]}/{NAME}_{i}.{extn}" + + stop = time.time() + traj = read_func(path) + read_time += time.time() - stop + + data_size += os.path.getsize(path) + path = f"{PATH}temp.{extn}" + + stop = time.time() + write_func(path, traj) + write_time += time.time() - stop + + os.remove(path) + return read_time, write_time, data_size / MB + + if __name__ == "__main__": - exit() \ No newline at end of file + + reg_dict = {"name": "Arrow_REG", + "read_func": lambda path: Reader(pa.OSFile(path, "rb")).read_all(), + "write_func": lambda path, data: Writer(pa.OSFile(path, "wb"), data.schema).write_table(data) + } + ipc_dict = {"name": "Arrow_IPC", + "read_func": lambda path: pa.ipc.open_file(path).read_all(), + "write_func": lambda path, data: pa.ipc.new_file(path, data.schema).write_table(data) + } + fth_dict = {"name": "Feather_FTH", + "read_func": lambda path: ft.read_table(path), + "write_func": lambda path, data: ft.write_feather(data, path) + } + pd_dict = {"name": "Pandas_FTH", + "read_func": lambda path: pd.read_feather(path), + "write_func": lambda path, data: data.to_feather(path) + } + + for lib in [reg_dict, ipc_dict, fth_dict, pd_dict]: + rt, wt, mb = measure_traj(lib["read_func"], lib["write_func"], lib["name"]) + + print(f"\n{lib['name']}: \nData size = {mb:.4f} MB; Num. traj = {N}") + print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {N / rt :.4f} traj/s") + print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {N / wt :.4f} traj/s") \ No newline at end of file diff --git a/benchmarking/hdf5.py b/benchmarking/hdf5.py new file mode 100644 index 0000000..a4c26db --- /dev/null +++ b/benchmarking/hdf5.py @@ -0,0 +1,19 @@ +import h5py +import pyarrow.parquet as pq + +def parquet_to_hdf5(parquet_file, hdf5_file): + # Read the Parquet file + table = pq.read_table(parquet_file) + columns = table.column_names + + # Create HDF5 file + with h5py.File(hdf5_file, 'w') as h5_file: + for column in columns: + data = table[column].to_numpy() + h5_file.create_dataset(column, data=data) + print(f"Data successfully written to {hdf5_file}") + +# Example usage +parquet_file = 'input_file.parquet' +hdf5_file = 'output_file.h5' +parquet_to_hdf5(parquet_file, hdf5_file) \ No newline at end of file diff --git a/benchmarking/plotting_notes.py b/benchmarking/plotting_notes.py index 722f1a6..71a643a 100644 --- a/benchmarking/plotting_notes.py +++ b/benchmarking/plotting_notes.py @@ -18,8 +18,27 @@ # Read: latency = 49.1056 s ; throughput = 136.5909 MB/s, 1.0386 traj/s # Write: latency = 103.6820 s; throughput = 64.6918 MB/s, 0.4919 traj/s -# plotting + code -# temporary plots: +"""###""" + +# Arrow_REG: +# Data size = 14651.9697 MB ; Num. traj = 51 +# Read: latency = 39.5354 s; throughput = 370.6041 MB/s, 1.2900 traj/s +# Write: latency = 26.4765 s; throughput = 553.3952 MB/s, 1.9262 traj/s + +# Arrow_IPC: +# Data size = 14651.9697 MB ; Num. traj = 51 +# Read: latency = 44.0609 s; throughput = 332.5391 MB/s, 1.1575 traj/s +# Write: latency = 25.1574 s; throughput = 582.4108 MB/s, 2.0272 traj/s + +# Feather_FTH: +# Data size = 14651.9697 MB; Num. traj = 51 +# Read: latency = 47.5646 s; throughput = 308.0433 MB/s, 1.0722 traj/s +# Write: latency = 35.4595 s; throughput = 413.2030 MB/s, 1.4383 traj/s + +# Pandas_FTH: +# Data size = 14651.9697 MB; Num. traj = 51 +# Read: latency = 26.6213 s; throughput = 550.3844 MB/s, 1.9158 traj/s +# Write: latency = 45.9015 s; throughput = 319.2046 MB/s, 1.1111 traj/s import matplotlib.pyplot as plt diff --git a/benchmarking/convert_tf_hdf5.py b/benchmarking/tf_to_hdf5.py similarity index 96% rename from benchmarking/convert_tf_hdf5.py rename to benchmarking/tf_to_hdf5.py index 747b757..b6755e8 100644 --- a/benchmarking/convert_tf_hdf5.py +++ b/benchmarking/tf_to_hdf5.py @@ -5,7 +5,7 @@ import pyarrow as pa import pyarrow.parquet as pq -# NOTE: this code assumes you have the path: home/username/datasets/berkeley_autolab_ur5/TRAIN.tfrecord +# NOTE: this code assumes you have the paths: home/username/datasets/berkeley_autolab_ur5/TRAIN.tfrecord def parse_tfrecord(raw_record): FLF = tf.io.FixedLenFeature