diff --git a/daft/__init__.py b/daft/__init__.py index 9069810c42..71845465d8 100644 --- a/daft/__init__.py +++ b/daft/__init__.py @@ -73,6 +73,7 @@ def get_build_type() -> str: from_glob_path, read_csv, read_delta_lake, + read_hudi, read_iceberg, read_json, read_parquet, @@ -93,6 +94,7 @@ def get_build_type() -> str: "read_csv", "read_json", "read_parquet", + "read_hudi", "read_iceberg", "read_delta_lake", "read_sql", diff --git a/daft/hudi/__init__.py b/daft/hudi/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/daft/hudi/hudi_scan.py b/daft/hudi/hudi_scan.py new file mode 100644 index 0000000000..a860113d07 --- /dev/null +++ b/daft/hudi/hudi_scan.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import logging +import os +from collections.abc import Iterator + +import daft +from daft.daft import ( + FileFormatConfig, + ParquetSourceConfig, + Pushdowns, + ScanTask, + StorageConfig, +) +from daft.filesystem import _resolve_paths_and_filesystem +from daft.hudi.pyhudi.table import HudiTable, HudiTableMetadata +from daft.io.scan import PartitionField, ScanOperator +from daft.logical.schema import Schema + +logger = logging.getLogger(__name__) + + +class HudiScanOperator(ScanOperator): + def __init__(self, table_uri: str, storage_config: StorageConfig) -> None: + super().__init__() + resolved_path, resolved_fs = _resolve_paths_and_filesystem(table_uri, storage_config.config.io_config) + self._table = HudiTable(table_uri, resolved_fs, resolved_path[0]) + self._storage_config = storage_config + self._schema = Schema.from_pyarrow_schema(self._table.schema) + partition_fields = set(self._table.props.partition_fields) + self._partition_keys = [ + PartitionField(field._field) for field in self._schema if field.name in partition_fields + ] + + def schema(self) -> Schema: + return self._schema + + def display_name(self) -> str: + return f"HudiScanOperator({self._table.props.name})" + + def partitioning_keys(self) -> list[PartitionField]: + return self._partition_keys + + def multiline_display(self) -> list[str]: + return [ + self.display_name(), + f"Schema = {self._schema}", + f"Partitioning keys = {self.partitioning_keys()}", + f"Storage config = {self._storage_config}", + ] + + def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: + import pyarrow as pa + + hudi_table_metadata: HudiTableMetadata = self._table.latest_table_metadata() + files_metadata = hudi_table_metadata.files_metadata + + if len(self.partitioning_keys()) > 0 and pushdowns.partition_filters is None: + logging.warning( + f"{self.display_name()} has partitioning keys = {self.partitioning_keys()}, but no partition filter was specified. This will result in a full table scan." + ) + + limit_files = pushdowns.limit is not None and pushdowns.filters is None and pushdowns.partition_filters is None + rows_left = pushdowns.limit if pushdowns.limit is not None else 0 + scan_tasks = [] + for task_idx in range(files_metadata.num_rows): + if limit_files and rows_left <= 0: + break + + path = os.path.join(self._table.table_uri, files_metadata["path"][task_idx].as_py()) + record_count = files_metadata["num_records"][task_idx].as_py() + try: + size_bytes = files_metadata["size_bytes"][task_idx].as_py() + except KeyError: + size_bytes = None + file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig()) + + if self._table.is_partitioned: + dtype = files_metadata.schema.field("partition_values").type + part_values = files_metadata["partition_values"][task_idx] + arrays = {} + for field_idx in range(dtype.num_fields): + field_name = dtype.field(field_idx).name + try: + arrow_arr = pa.array([part_values[field_name]], type=dtype.field(field_idx).type) + except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError): + # pyarrow < 13.0.0 doesn't accept pyarrow scalars in the array constructor. + arrow_arr = pa.array([part_values[field_name].as_py()], type=dtype.field(field_idx).type) + arrays[field_name] = daft.Series.from_arrow(arrow_arr, field_name) + partition_values = daft.table.Table.from_pydict(arrays)._table + else: + partition_values = None + + # Populate scan task with column-wise stats. + schema = self._table.schema + min_values = hudi_table_metadata.colstats_min_values + max_values = hudi_table_metadata.colstats_max_values + arrays = {} + for field_idx in range(len(schema)): + field_name = schema.field(field_idx).name + field_type = schema.field(field_idx).type + try: + arrow_arr = pa.array( + [min_values[field_name][task_idx], max_values[field_name][task_idx]], type=field_type + ) + except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError): + # pyarrow < 13.0.0 doesn't accept pyarrow scalars in the array constructor. + arrow_arr = pa.array( + [min_values[field_name][task_idx].as_py(), max_values[field_name][task_idx].as_py()], + type=field_type, + ) + arrays[field_name] = daft.Series.from_arrow(arrow_arr, field_name) + stats = daft.table.Table.from_pydict(arrays)._table + + st = ScanTask.catalog_scan_task( + file=path, + file_format=file_format_config, + schema=self._schema._schema, + num_rows=record_count, + storage_config=self._storage_config, + size_bytes=size_bytes, + pushdowns=pushdowns, + partition_values=partition_values, + stats=stats, + ) + if st is None: + continue + rows_left -= record_count + scan_tasks.append(st) + return iter(scan_tasks) + + def can_absorb_filter(self) -> bool: + return False + + def can_absorb_limit(self) -> bool: + return False + + def can_absorb_select(self) -> bool: + return True diff --git a/daft/hudi/pyhudi/__init__.py b/daft/hudi/pyhudi/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/daft/hudi/pyhudi/filegroup.py b/daft/hudi/pyhudi/filegroup.py new file mode 100644 index 0000000000..298726d2ea --- /dev/null +++ b/daft/hudi/pyhudi/filegroup.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +import pyarrow as pa +from sortedcontainers import SortedDict + +from daft.hudi.pyhudi.utils import FsFileMetadata + + +@dataclass(init=False) +class BaseFile: + def __init__(self, fs_metadata: FsFileMetadata): + self.metadata = fs_metadata + file_name = fs_metadata.base_name + self.file_name = file_name + file_group_id, _, commit_time_ext = file_name.split("_") + self.file_group_id = file_group_id + self.commit_time = commit_time_ext.split(".")[0] + + @property + def path(self) -> str: + return self.metadata.path + + @property + def size(self) -> int: + return self.metadata.size + + @property + def num_records(self) -> int: + return self.metadata.num_records + + @property + def schema(self) -> pa.Schema: + return self.metadata.schema + + @property + def min_values(self): + return self.metadata.min_values + + @property + def max_values(self): + return self.metadata.max_values + + +@dataclass +class FileSlice: + FILES_METADATA_SCHEMA = pa.schema( + [ + ("path", pa.string()), + ("size", pa.uint32()), + ("num_records", pa.uint32()), + ("partition_path", pa.string()), + ] + ) + + file_group_id: str + partition_path: str + base_instant_time: str + base_file: BaseFile + + @property + def files_metadata(self): + return self.base_file.path, self.base_file.size, self.base_file.num_records, self.partition_path + + @property + def colstats_min_max(self): + return self.base_file.min_values, self.base_file.max_values + + +@dataclass +class FileGroup: + file_group_id: str + partition_path: str + file_slices: SortedDict[str, FileSlice] = field(default_factory=SortedDict) + + def add_base_file(self, base_file: BaseFile): + ct = base_file.commit_time + if ct in self.file_slices: + self.file_slices.get(ct).base_file = base_file + else: + self.file_slices[ct] = FileSlice(self.file_group_id, self.partition_path, ct, base_file) + + def get_latest_file_slice(self) -> FileSlice | None: + if not self.file_slices: + return None + + return self.file_slices.peekitem(-1)[1] diff --git a/daft/hudi/pyhudi/table.py b/daft/hudi/pyhudi/table.py new file mode 100644 index 0000000000..3f1dd893d4 --- /dev/null +++ b/daft/hudi/pyhudi/table.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import os +from collections import defaultdict +from dataclasses import dataclass + +import pyarrow as pa +import pyarrow.fs as pafs + +from daft.hudi.pyhudi.filegroup import BaseFile, FileGroup, FileSlice +from daft.hudi.pyhudi.timeline import Timeline +from daft.hudi.pyhudi.utils import ( + list_full_sub_dirs, + list_leaf_dirs, + list_relative_file_paths, +) + +# TODO(Shiyan): support base file in .orc +BASE_FILE_EXTENSIONS = [".parquet"] + + +@dataclass +class MetaClient: + fs: pafs.FileSystem + base_path: str + timeline: Timeline | None + + def get_active_timeline(self) -> Timeline: + if not self.timeline: + self.timeline = Timeline(self.base_path, self.fs) + return self.timeline + + def get_partition_paths(self, relative=True) -> list[str]: + first_level_full_partition_paths = list_full_sub_dirs(self.base_path, self.fs, excludes=[".hoodie"]) + partition_paths = [] + for p in first_level_full_partition_paths: + partition_paths.extend(list_leaf_dirs(p, self.fs)) + + common_prefix_len = len(self.base_path) + 1 if relative else 0 + return [p[common_prefix_len:] for p in partition_paths] + + def get_file_groups(self, partition_path: str) -> list[FileGroup]: + base_file_metadata = list_relative_file_paths( + self.base_path, partition_path, self.fs, includes=BASE_FILE_EXTENSIONS + ) + fg_id_to_base_files = defaultdict(list) + for metadata in base_file_metadata: + base_file = BaseFile(metadata) + fg_id_to_base_files[base_file.file_group_id].append(base_file) + file_groups = [] + for fg_id, base_files in fg_id_to_base_files.items(): + file_group = FileGroup(fg_id, partition_path) + for base_file in base_files: + file_group.add_base_file(base_file) + file_groups.append(file_group) + return file_groups + + +@dataclass(init=False) +class FileSystemView: + def __init__(self, meta_client: MetaClient): + self.meta_client = meta_client + self.partition_to_file_groups: dict[str, list[FileGroup]] = {} + self._load_partitions() + + def _load_partitions(self): + partition_paths = self.meta_client.get_partition_paths() + for partition_path in partition_paths: + self._load_partition(partition_path) + + def _load_partition(self, partition_path: str): + file_groups = self.meta_client.get_file_groups(partition_path) + self.partition_to_file_groups[partition_path] = file_groups + + def get_latest_file_slices(self) -> list[FileSlice]: + file_slices = [] + for file_groups in self.partition_to_file_groups.values(): + for file_group in file_groups: + file_slice = file_group.get_latest_file_slice() + if file_slice is not None: + file_slices.append(file_slice) + + return file_slices + + +@dataclass(init=False) +class HudiTableProps: + def __init__(self, fs: pafs.FileSystem, table_uri: str): + self._props = {} + hoodie_properties_file = os.path.join(table_uri, ".hoodie", "hoodie.properties") + with fs.open_input_file(hoodie_properties_file) as f: + lines = f.readall().decode("utf-8").splitlines() + for line in lines: + line = line.strip() + if not line or line.startswith("#"): + continue + key, value = line.split("=") + self._props[key] = value + + @property + def name(self) -> str: + return self._props["hoodie.table.name"] + + @property + def partition_fields(self) -> list[str]: + return self._props["hoodie.table.partition.fields"] + + def get_config(self, key: str) -> str: + return self._props[key] + + +@dataclass +class HudiTableMetadata: + + files_metadata: pa.RecordBatch + colstats_min_values: pa.RecordBatch + colstats_max_values: pa.RecordBatch + + +class UnsupportedException(Exception): + pass + + +@dataclass(init=False) +class HudiTable: + def __init__(self, table_uri: str, fs: pafs.FileSystem, base_path: str): + self.table_uri = table_uri + self._meta_client = MetaClient(fs, base_path, timeline=None) + self._props = HudiTableProps(fs, base_path) + self._validate_table_props() + + def _validate_table_props(self): + if self._props.get_config("hoodie.table.type") != "COPY_ON_WRITE": + raise UnsupportedException("Only support COPY_ON_WRITE table") + if self._props.get_config("hoodie.table.keygenerator.class") != "org.apache.hudi.keygen.SimpleKeyGenerator": + raise UnsupportedException("Only support using Simple Key Generator") + + def latest_table_metadata(self) -> HudiTableMetadata: + file_slices = FileSystemView(self._meta_client).get_latest_file_slices() + files_metadata = [] + min_vals_arr = [] + max_vals_arr = [] + for file_slice in file_slices: + files_metadata.append(file_slice.files_metadata) + min_vals, max_vals = file_slice.colstats_min_max + min_vals_arr.append(min_vals) + max_vals_arr.append(max_vals) + metadata_arrays = [pa.array(column) for column in list(zip(*files_metadata))] + min_value_arrays = [pa.array(column) for column in list(zip(*min_vals_arr))] + max_value_arrays = [pa.array(column) for column in list(zip(*max_vals_arr))] + return HudiTableMetadata( + pa.RecordBatch.from_arrays(metadata_arrays, schema=FileSlice.FILES_METADATA_SCHEMA), + pa.RecordBatch.from_arrays(min_value_arrays, schema=self.schema), + pa.RecordBatch.from_arrays(max_value_arrays, schema=self.schema), + ) + + @property + def base_path(self) -> str: + return self._meta_client.base_path + + @property + def schema(self) -> pa.Schema: + return self._meta_client.get_active_timeline().get_latest_commit_schema() + + @property + def is_partitioned(self) -> bool: + return self._props.partition_fields == "" + + @property + def props(self) -> HudiTableProps: + return self._props diff --git a/daft/hudi/pyhudi/timeline.py b/daft/hudi/pyhudi/timeline.py new file mode 100644 index 0000000000..57d88417d0 --- /dev/null +++ b/daft/hudi/pyhudi/timeline.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from enum import Enum + +import pyarrow as pa +import pyarrow.fs as pafs +import pyarrow.parquet as pq + + +class State(Enum): + REQUESTED = 0 + INFLIGHT = 1 + COMPLETED = 2 + + +@dataclass +class Instant: + state: State + action: str + timestamp: str + + @property + def file_name(self): + state = "" if self.state == State.COMPLETED else f".{self.state.name.lower()}" + return f"{self.timestamp}.{self.action}{state}" + + def __lt__(self, other: Instant): + return [self.timestamp, self.state] < [other.timestamp, other.state] + + +@dataclass(init=False) +class Timeline: + base_path: str + fs: pafs.FileSystem + instants: list[Instant] + + def __init__(self, base_path: str, fs: pafs.FileSystem): + self.base_path = base_path + self.fs = fs + self._load_completed_commit_instants() + + def _load_completed_commit_instants(self): + timeline_path = os.path.join(self.base_path, ".hoodie") + action = "commit" + ext = ".commit" + instants = [] + for file_info in self.fs.get_file_info(pafs.FileSelector(timeline_path)): + if file_info.base_name.endswith(ext): + timestamp = file_info.base_name[: -len(ext)] + instants.append(Instant(state=State.COMPLETED, action=action, timestamp=timestamp)) + self.instants = sorted(instants) + + def get_latest_commit_metadata(self) -> dict: + latest_instant_file_path = os.path.join(self.base_path, ".hoodie", self.instants[-1].file_name) + with self.fs.open_input_file(latest_instant_file_path) as f: + return json.load(f) + + def get_latest_commit_schema(self) -> pa.Schema: + latest_commit_metadata = self.get_latest_commit_metadata() + _, write_stats = next(iter(latest_commit_metadata["partitionToWriteStats"].items())) + base_file_path = os.path.join(self.base_path, write_stats[0]["path"]) + with self.fs.open_input_file(base_file_path) as f: + return pq.read_schema(f) diff --git a/daft/hudi/pyhudi/utils.py b/daft/hudi/pyhudi/utils.py new file mode 100644 index 0000000000..6f60df4698 --- /dev/null +++ b/daft/hudi/pyhudi/utils.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass + +import pyarrow as pa +import pyarrow.fs as pafs +import pyarrow.parquet as pq + + +@dataclass(init=False) +class FsFileMetadata: + def __init__(self, fs: pafs.FileSystem, base_path: str, path: str, base_name: str): + self.base_path = base_path + self.path = path + self.base_name = base_name + with fs.open_input_file(os.path.join(base_path, path)) as f: + metadata = pq.read_metadata(f) + self.size = metadata.serialized_size + self.num_records = metadata.num_rows + self.schema, self.min_values, self.max_values = FsFileMetadata._extract_min_max(metadata) + + @staticmethod + def _extract_min_max(metadata: pq.FileMetaData): + arrow_schema = pa.schema(metadata.schema.to_arrow_schema()) + n_columns = len(arrow_schema) + min_vals = [None] * n_columns + max_vals = [None] * n_columns + num_rg = metadata.num_row_groups + for rg in range(num_rg): + row_group = metadata.row_group(rg) + for col in range(n_columns): + column = row_group.column(col) + if column.is_stats_set and column.statistics.has_min_max: + if min_vals[col] is None or column.statistics.min < min_vals[col]: + min_vals[col] = column.statistics.min + if max_vals[col] is None or column.statistics.max > max_vals[col]: + max_vals[col] = column.statistics.max + return arrow_schema, min_vals, max_vals + + +def list_relative_file_paths( + base_path: str, sub_path: str, fs: pafs.FileSystem, includes: list[str] | None +) -> list[FsFileMetadata]: + listed_paths: list[pafs.FileInfo] = fs.get_file_info(pafs.FileSelector(os.path.join(base_path, sub_path))) + file_paths = [] + common_prefix_len = len(base_path) + 1 + for listed_path in listed_paths: + if listed_path.type == pafs.FileType.File: + if includes and os.path.splitext(listed_path.base_name)[-1] in includes: + file_paths.append( + FsFileMetadata(fs, base_path, listed_path.path[common_prefix_len:], listed_path.base_name) + ) + + return file_paths + + +def list_full_sub_dirs(path: str, fs: pafs.FileSystem, excludes: list[str] | None) -> list[str]: + sub_paths: list[pafs.FileInfo] = fs.get_file_info(pafs.FileSelector(path)) + sub_dirs = [] + for sub_path in sub_paths: + if sub_path.type == pafs.FileType.Directory: + if not excludes or (excludes and sub_path.base_name not in excludes): + sub_dirs.append(sub_path.path) + + return sub_dirs + + +def list_leaf_dirs(path: str, fs: pafs.FileSystem) -> list[str]: + sub_paths: list[pafs.FileInfo] = fs.get_file_info(pafs.FileSelector(path)) + leaf_dirs = [] + + for sub_path in sub_paths: + if sub_path.type == pafs.FileType.Directory: + leaf_dirs.extend(list_leaf_dirs(sub_path.path, fs)) + + # leaf directory + if len(leaf_dirs) == 0: + leaf_dirs.append(path) + + return leaf_dirs diff --git a/daft/io/__init__.py b/daft/io/__init__.py index ce21b86cf5..77b30d7142 100644 --- a/daft/io/__init__.py +++ b/daft/io/__init__.py @@ -11,6 +11,7 @@ ) from daft.io._csv import read_csv from daft.io._delta_lake import read_delta_lake +from daft.io._hudi import read_hudi from daft.io._iceberg import read_iceberg from daft.io._json import read_json from daft.io._parquet import read_parquet @@ -38,6 +39,7 @@ def _set_linux_cert_paths(): "read_json", "from_glob_path", "read_parquet", + "read_hudi", "read_iceberg", "read_delta_lake", "read_sql", diff --git a/daft/io/_hudi.py b/daft/io/_hudi.py new file mode 100644 index 0000000000..f6ea93947c --- /dev/null +++ b/daft/io/_hudi.py @@ -0,0 +1,42 @@ +# isort: dont-add-import: from __future__ import annotations + +from typing import Optional + +from daft import context +from daft.api_annotations import PublicAPI +from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig +from daft.dataframe import DataFrame +from daft.logical.builder import LogicalPlanBuilder + + +@PublicAPI +def read_hudi( + table_uri: str, + io_config: Optional["IOConfig"] = None, +) -> DataFrame: + """Create a DataFrame from a Hudi table. + + Example: + >>> df = daft.read_hudi("some-table-uri") + >>> df = df.where(df["foo"] > 5) + >>> df.show() + + Args: + table_uri: URI to the Hudi table. + io_config: A custom IOConfig to use when accessing Hudi table object storage data. Defaults to None. + + Returns: + DataFrame: A DataFrame with the schema converted from the specified Hudi table. + """ + from daft.hudi.hudi_scan import HudiScanOperator + + io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config + + multithreaded_io = not context.get_context().is_ray_runner + storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) + + hudi_operator = HudiScanOperator(table_uri, storage_config=storage_config) + + handle = ScanOperatorHandle.from_python_scan_operator(hudi_operator) + builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle) + return DataFrame(builder) diff --git a/tests/io/hudi/__init__.py b/tests/io/hudi/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/io/hudi/conftest.py b/tests/io/hudi/conftest.py new file mode 100644 index 0000000000..6bf3803d4e --- /dev/null +++ b/tests/io/hudi/conftest.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import os +import zipfile +from pathlib import Path + +import pytest + + +@pytest.fixture +def unzip_table_0_x_cow_partitioned(tmp_path): + zip_file_path = Path(__file__).parent.joinpath("data", "0.x_cow_partitioned.zip") + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(tmp_path) + return os.path.join(tmp_path, "trips_table") diff --git a/tests/io/hudi/data/0.x_cow_partitioned.zip b/tests/io/hudi/data/0.x_cow_partitioned.zip new file mode 100644 index 0000000000..8e5482601a Binary files /dev/null and b/tests/io/hudi/data/0.x_cow_partitioned.zip differ diff --git a/tests/io/hudi/test_table_read.py b/tests/io/hudi/test_table_read.py new file mode 100644 index 0000000000..6d8cc639fe --- /dev/null +++ b/tests/io/hudi/test_table_read.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import daft + + +def test_hudi_read_table(unzip_table_0_x_cow_partitioned): + path = unzip_table_0_x_cow_partitioned + df = daft.read_hudi(path) + assert df.schema().column_names() == [ + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name", + "ts", + "uuid", + "rider", + "driver", + "fare", + "city", + ] + assert df.select("rider").sort("rider").to_pydict() == { + "rider": ["rider-A", "rider-C", "rider-D", "rider-F", "rider-J"] + }