Skip to content

Commit

Permalink
arrow done
Browse files Browse the repository at this point in the history
  • Loading branch information
itsjoshzhang committed Jun 21, 2024
1 parent 64f4098 commit cc26b5b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 9 deletions.
63 changes: 57 additions & 6 deletions benchmarking/arrow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import sys
import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as ft
from pyarrow import RecordBatchFileReader as Reader
from pyarrow import RecordBatchFileWriter as Writer

PATH = os.path.expanduser("~")
sys.path.append(PATH + "/fog_x_fork")
Expand All @@ -16,15 +19,15 @@
def convert_parquet(in_dir):
table = pq.read_table(in_dir)
file = in_dir.split("/")[-1].split("-")[0]

with pa.ipc.new_file(f"{out_dir}/IPC/{file}.arrow", table.schema) as w:
w.write_table(table)

with pa.OSFile(f"{out_dir}/REG/{file}.arrow", "wb") as sink:
with pa.RecordBatchFileWriter(sink, table.schema) as w:
with Writer(sink, table.schema) as w:
w.write_table(table)

ft.write_feather(table, f"{out_dir}/fth/{file}.arrow", "uncompressed")
with pa.ipc.new_file(f"{out_dir}/IPC/{file}.arrow", table.schema) as w:
w.write_table(table)

ft.write_feather(table, f"{out_dir}/FTH/{file}.feather", "uncompressed")


N = 51
Expand All @@ -34,5 +37,53 @@ def convert_parquet(in_dir):
for i in range(N):
convert_parquet(f"{PATH}/{NAME}/{NAME}_{i}-0.parquet")


def measure_traj(read_func, write_func, name):
read_time, write_time, data_size = 0, 0, 0

for i in range(N):
print(f"Measuring trajectory {i}")

extn = "arrow" if (name[0] == "A") else "feather"
path = f"{out_dir}/{name[-3:]}/{NAME}_{i}.{extn}"

stop = time.time()
traj = read_func(path)
read_time += time.time() - stop

data_size += os.path.getsize(path)
path = f"{PATH}temp.{extn}"

stop = time.time()
write_func(path, traj)
write_time += time.time() - stop

os.remove(path)
return read_time, write_time, data_size / MB


if __name__ == "__main__":
exit()

reg_dict = {"name": "Arrow_REG",
"read_func": lambda path: Reader(pa.OSFile(path, "rb")).read_all(),
"write_func": lambda path, data: Writer(pa.OSFile(path, "wb"), data.schema).write_table(data)
}
ipc_dict = {"name": "Arrow_IPC",
"read_func": lambda path: pa.ipc.open_file(path).read_all(),
"write_func": lambda path, data: pa.ipc.new_file(path, data.schema).write_table(data)
}
fth_dict = {"name": "Feather_FTH",
"read_func": lambda path: ft.read_table(path),
"write_func": lambda path, data: ft.write_feather(data, path)
}
pd_dict = {"name": "Pandas_FTH",
"read_func": lambda path: pd.read_feather(path),
"write_func": lambda path, data: data.to_feather(path)
}

for lib in [reg_dict, ipc_dict, fth_dict, pd_dict]:
rt, wt, mb = measure_traj(lib["read_func"], lib["write_func"], lib["name"])

print(f"\n{lib['name']}: \nData size = {mb:.4f} MB; Num. traj = {N}")
print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {N / rt :.4f} traj/s")
print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {N / wt :.4f} traj/s")
19 changes: 19 additions & 0 deletions benchmarking/hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import h5py
import pyarrow.parquet as pq

def parquet_to_hdf5(parquet_file, hdf5_file):
# Read the Parquet file
table = pq.read_table(parquet_file)
columns = table.column_names

# Create HDF5 file
with h5py.File(hdf5_file, 'w') as h5_file:
for column in columns:
data = table[column].to_numpy()
h5_file.create_dataset(column, data=data)
print(f"Data successfully written to {hdf5_file}")

# Example usage
parquet_file = 'input_file.parquet'
hdf5_file = 'output_file.h5'
parquet_to_hdf5(parquet_file, hdf5_file)
23 changes: 21 additions & 2 deletions benchmarking/plotting_notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,27 @@
# Read: latency = 49.1056 s ; throughput = 136.5909 MB/s, 1.0386 traj/s
# Write: latency = 103.6820 s; throughput = 64.6918 MB/s, 0.4919 traj/s

# plotting + code
# temporary plots:
"""###"""

# Arrow_REG:
# Data size = 14651.9697 MB ; Num. traj = 51
# Read: latency = 39.5354 s; throughput = 370.6041 MB/s, 1.2900 traj/s
# Write: latency = 26.4765 s; throughput = 553.3952 MB/s, 1.9262 traj/s

# Arrow_IPC:
# Data size = 14651.9697 MB ; Num. traj = 51
# Read: latency = 44.0609 s; throughput = 332.5391 MB/s, 1.1575 traj/s
# Write: latency = 25.1574 s; throughput = 582.4108 MB/s, 2.0272 traj/s

# Feather_FTH:
# Data size = 14651.9697 MB; Num. traj = 51
# Read: latency = 47.5646 s; throughput = 308.0433 MB/s, 1.0722 traj/s
# Write: latency = 35.4595 s; throughput = 413.2030 MB/s, 1.4383 traj/s

# Pandas_FTH:
# Data size = 14651.9697 MB; Num. traj = 51
# Read: latency = 26.6213 s; throughput = 550.3844 MB/s, 1.9158 traj/s
# Write: latency = 45.9015 s; throughput = 319.2046 MB/s, 1.1111 traj/s

import matplotlib.pyplot as plt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
import pyarrow.parquet as pq

# NOTE: this code assumes you have the path: home/username/datasets/berkeley_autolab_ur5/TRAIN.tfrecord
# NOTE: this code assumes you have the paths: home/username/datasets/berkeley_autolab_ur5/TRAIN.tfrecord

def parse_tfrecord(raw_record):
FLF = tf.io.FixedLenFeature
Expand Down

0 comments on commit cc26b5b

Please sign in to comment.