Skip to content

Commit

Permalink
[FEAT] Handle Hudi empty timeline (#2268)
Browse files Browse the repository at this point in the history
Handle cases where Hudi timeline is empty or has no write stats.
  • Loading branch information
xushiyan authored May 10, 2024
1 parent 984631c commit feee6a3
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 13 deletions.
3 changes: 2 additions & 1 deletion daft/hudi/pyhudi/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def latest_table_metadata(self) -> HudiTableMetadata:
files_metadata = []
min_vals_arr = []
max_vals_arr = []
files_metadata_schema = FileSlice.FILES_METADATA_SCHEMA if file_slices else pa.schema([])
colstats_schema = file_slices[0].colstats_schema if file_slices else pa.schema([])
for file_slice in file_slices:
files_metadata.append(file_slice.files_metadata)
Expand All @@ -181,7 +182,7 @@ def latest_table_metadata(self) -> HudiTableMetadata:
min_value_arrays = [pa.array(column) for column in list(zip(*min_vals_arr))]
max_value_arrays = [pa.array(column) for column in list(zip(*max_vals_arr))]
return HudiTableMetadata(
pa.RecordBatch.from_arrays(metadata_arrays, schema=FileSlice.FILES_METADATA_SCHEMA),
pa.RecordBatch.from_arrays(metadata_arrays, schema=files_metadata_schema),
pa.RecordBatch.from_arrays(min_value_arrays, schema=colstats_schema),
pa.RecordBatch.from_arrays(max_value_arrays, schema=colstats_schema),
)
Expand Down
26 changes: 18 additions & 8 deletions daft/hudi/pyhudi/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,41 @@ def __lt__(self, other: Instant):
class Timeline:
base_path: str
fs: pafs.FileSystem
instants: list[Instant]
completed_commit_instants: list[Instant]

def __init__(self, base_path: str, fs: pafs.FileSystem):
self.base_path = base_path
self.fs = fs
self._load_completed_commit_instants()

@property
def has_completed_commit(self) -> bool:
return len(self.completed_commit_instants) > 0

def _load_completed_commit_instants(self):
timeline_path = os.path.join(self.base_path, ".hoodie")
action = "commit"
ext = ".commit"
instants = []
write_action_exts = {".commit"}
commit_instants = []
for file_info in self.fs.get_file_info(pafs.FileSelector(timeline_path)):
if file_info.base_name.endswith(ext):
ext = os.path.splitext(file_info.base_name)[1]
if ext in write_action_exts:
timestamp = file_info.base_name[: -len(ext)]
instants.append(Instant(state=State.COMPLETED, action=action, timestamp=timestamp))
self.instants = sorted(instants)
instant = Instant(state=State.COMPLETED, action=ext[1:], timestamp=timestamp)
commit_instants.append(instant)
self.completed_commit_instants = sorted(commit_instants)

def get_latest_commit_metadata(self) -> dict:
latest_instant_file_path = os.path.join(self.base_path, ".hoodie", self.instants[-1].file_name)
if not self.has_completed_commit:
return {}
latest_instant_file_path = os.path.join(self.base_path, ".hoodie", self.completed_commit_instants[-1].file_name)
with self.fs.open_input_file(latest_instant_file_path) as f:
return json.load(f)

def get_latest_commit_schema(self) -> pa.Schema:
latest_commit_metadata = self.get_latest_commit_metadata()
if not latest_commit_metadata.get("partitionToWriteStats"):
return pa.schema([])

_, write_stats = next(iter(latest_commit_metadata["partitionToWriteStats"].items()))
base_file_path = os.path.join(self.base_path, write_stats[0]["path"])
with self.fs.open_input_file(base_file_path) as f:
Expand Down
21 changes: 18 additions & 3 deletions tests/io/hudi/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import pytest


def _extract_testing_table(zip_file_path, target_path, table_name) -> str:
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(target_path)
return os.path.join(target_path, table_name)


@pytest.fixture(
params=[
"v6_complexkeygen_hivestyle",
Expand All @@ -19,6 +25,15 @@
def get_testing_table_for_supported_cases(request, tmp_path) -> str:
table_name = request.param
zip_file_path = Path(__file__).parent.joinpath("data", f"{table_name}.zip")
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(tmp_path)
return os.path.join(tmp_path, table_name)
return _extract_testing_table(zip_file_path, tmp_path, table_name)


@pytest.fixture(
params=[
"v6_empty",
]
)
def get_empty_table(request, tmp_path) -> str:
table_name = request.param
zip_file_path = Path(__file__).parent.joinpath("data", f"{table_name}.zip")
return _extract_testing_table(zip_file_path, tmp_path, table_name)
11 changes: 11 additions & 0 deletions tests/io/hudi/data/v6_empty.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table v6_empty (
id INT,
name STRING,
isActive BOOLEAN
)
USING HUDI
TBLPROPERTIES (
type = 'cow',
primaryKey = 'id',
'hoodie.metadata.enable' = 'false'
);
Binary file added tests/io/hudi/data/v6_empty.zip
Binary file not shown.
8 changes: 7 additions & 1 deletion tests/io/hudi/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0")


def test_hudi_read_table(get_testing_table_for_supported_cases):
def test_read_table(get_testing_table_for_supported_cases):
table_path = get_testing_table_for_supported_cases
df = daft.read_hudi(table_path)

Expand Down Expand Up @@ -59,3 +59,9 @@ def test_hudi_read_table(get_testing_table_for_supported_cases):
},
],
}


def test_read_empty_table(get_empty_table):
table_path = get_empty_table
df = daft.read_hudi(table_path)
assert len(df.collect()) == 0

0 comments on commit feee6a3

Please sign in to comment.