Skip to content

Commit

Permalink
[FEAT] Support Hive-Style Partitioned Writes for Tabular Writes (#1794)
Browse files Browse the repository at this point in the history
closes: #1790 
closes: #1766 
* Enables user to perform writes via partition key
* Dataframe returned then contains the files partitioned (via hive style
paths) as well as the partition keys supplied
* Parquet / CSV writer now perform chunked writes to a configurable
output file size
* Expose Iceberg Transform Functions as Expressions


<img width="725" alt="image"
src="https://github.com/Eventual-Inc/Daft/assets/2550285/26c22a02-dddf-49cd-8a96-3017f6cbc124">
  • Loading branch information
samster25 authored Jan 23, 2024
1 parent e843bc0 commit 966145b
Show file tree
Hide file tree
Showing 26 changed files with 651 additions and 161 deletions.
23 changes: 23 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ def set_execution_config(
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand All @@ -221,6 +228,15 @@ def set_execution_config(
fewer partitions. (Defaults to 512 MiB)
broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used.
Default is 10 MiB.
sample_size_for_sort: number of elements to sample from each partition when running sort,
Default is 20.
num_preview_rows: number of rows to when showing a dataframe preview,
Default is 8.
parquet_target_filesize: Target File Size when writing out Parquet Files. Defaults to 512MB
parquet_target_row_group_size: Target Row Group Size when writing out Parquet Files. Defaults to 128MB
parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -229,6 +245,13 @@ def set_execution_config(
merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes,
merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes,
broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
sample_size_for_sort=sample_size_for_sort,
num_preview_rows=num_preview_rows,
parquet_target_filesize=parquet_target_filesize,
parquet_target_row_group_size=parquet_target_row_group_size,
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
)

ctx.daft_execution_config = new_daft_execution_config
Expand Down
34 changes: 30 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,12 @@ class PyExpr:
def url_download(
self, max_connections: int, raise_error_on_failure: bool, multi_thread: bool, config: IOConfig
) -> PyExpr: ...
def partitioning_days(self) -> PyExpr: ...
def partitioning_hours(self) -> PyExpr: ...
def partitioning_months(self) -> PyExpr: ...
def partitioning_years(self) -> PyExpr: ...
def partitioning_iceberg_bucket(self, n: int) -> PyExpr: ...
def partitioning_iceberg_truncate(self, w: int) -> PyExpr: ...

def eq(expr1: PyExpr, expr2: PyExpr) -> bool: ...
def col(name: str) -> PyExpr: ...
Expand Down Expand Up @@ -945,6 +951,7 @@ class PySeries:
def is_null(self) -> PySeries: ...
def not_null(self) -> PySeries: ...
def murmur3_32(self) -> PySeries: ...
def to_str_values(self) -> PySeries: ...
def _debug_bincode_serialize(self) -> bytes: ...
@staticmethod
def _debug_bincode_deserialize(b: bytes) -> PySeries: ...
Expand All @@ -969,6 +976,7 @@ class PyTable:
def partition_by_range(
self, partition_keys: list[PyExpr], boundaries: PyTable, descending: list[bool]
) -> list[PyTable]: ...
def partition_by_value(self, partition_keys: list[PyExpr]) -> tuple[list[PyTable], PyTable]: ...
def __repr__(self) -> str: ...
def _repr_html_(self) -> str: ...
def __len__(self) -> int: ...
Expand Down Expand Up @@ -1023,6 +1031,7 @@ class PyMicroPartition:
def partition_by_range(
self, partition_keys: list[PyExpr], boundaries: PyTable, descending: list[bool]
) -> list[PyMicroPartition]: ...
def partition_by_value(self, exprs: list[PyExpr]) -> tuple[list[PyMicroPartition], PyMicroPartition]: ...
def __repr__(self) -> str: ...
def __len__(self) -> int: ...
@classmethod
Expand Down Expand Up @@ -1140,17 +1149,34 @@ class PyDaftExecutionConfig:
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
) -> PyDaftExecutionConfig: ...
@property
def merge_scan_tasks_min_size_bytes(self) -> int: ...
@property
def merge_scan_tasks_max_size_bytes(self): ...
def merge_scan_tasks_max_size_bytes(self) -> int: ...
@property
def broadcast_join_size_bytes_threshold(self): ...
def broadcast_join_size_bytes_threshold(self) -> int: ...
@property
def sample_size_for_sort(self): ...
def sample_size_for_sort(self) -> int: ...
@property
def num_preview_rows(self): ...
def num_preview_rows(self) -> int: ...
@property
def parquet_target_filesize(self) -> int: ...
@property
def parquet_target_row_group_size(self) -> int: ...
@property
def parquet_inflation_factor(self) -> float: ...
@property
def csv_target_filesize(self) -> int: ...
@property
def csv_inflation_factor(self) -> float: ...

class PyDaftPlanningConfig:
def with_config_values(
Expand Down
15 changes: 3 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def write_parquet(
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Currently only supports Column Expressions with any calls. Defaults to None.
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Returns:
Expand All @@ -354,11 +354,7 @@ def write_parquet(
cols: Optional[List[Expression]] = None
if partition_cols is not None:
cols = self.__column_input_to_expression(tuple(partition_cols))
for c in cols:
assert c._is_column(), "we cant support non Column Expressions for partition writing"
self.repartition(None, *cols)
else:
pass

builder = self._builder.write_tabular(
root_dir=root_dir,
partition_cols=cols,
Expand Down Expand Up @@ -396,7 +392,7 @@ def write_csv(
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Currently only supports Column Expressions with any calls. Defaults to None.
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Returns:
Expand All @@ -407,11 +403,6 @@ def write_csv(
cols: Optional[List[Expression]] = None
if partition_cols is not None:
cols = self.__column_input_to_expression(tuple(partition_cols))
for c in cols:
assert c._is_column(), "we cant support non Column Expressions for partition writing"
self.repartition(None, *cols)
else:
pass
builder = self._builder.write_tabular(
root_dir=root_dir,
partition_cols=cols,
Expand Down
34 changes: 8 additions & 26 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,32 +438,14 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]

def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
if self.file_format == FileFormat.Parquet:
file_names = table_io.write_parquet(
input,
path=self.root_dir,
compression=self.compression,
partition_cols=self.partition_cols,
io_config=self.io_config,
)
elif self.file_format == FileFormat.Csv:
file_names = table_io.write_csv(
input,
path=self.root_dir,
compression=self.compression,
partition_cols=self.partition_cols,
io_config=self.io_config,
)
else:
raise ValueError(
f"Only Parquet and CSV file formats are supported for writing, but got: {self.file_format}"
)

assert len(self.schema) == 1
return MicroPartition.from_pydict(
{
self.schema.column_names()[0]: file_names,
}
return table_io.write_tabular(
input,
path=self.root_dir,
schema=self.schema,
file_format=self.file_format,
compression=self.compression,
partition_cols=self.partition_cols,
io_config=self.io_config,
)


Expand Down
65 changes: 65 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def image(self) -> ExpressionImageNamespace:
"""Access methods that work on columns of images"""
return ExpressionImageNamespace.from_expression(self)

@property
def partitioning(self) -> ExpressionPartitioningNamespace:
"""Access methods that support partitioning operators"""
return ExpressionPartitioningNamespace.from_expression(self)

@staticmethod
def _from_pyexpr(pyexpr: _PyExpr) -> Expression:
expr = Expression.__new__(Expression)
Expand Down Expand Up @@ -897,3 +902,63 @@ def crop(self, bbox: tuple[int, int, int, int] | Expression) -> Expression:
bbox = Expression._to_expression(bbox).cast(DataType.fixed_size_list(DataType.uint64(), 4))
assert isinstance(bbox, Expression)
return Expression._from_pyexpr(self._expr.image_crop(bbox._expr))


class ExpressionPartitioningNamespace(ExpressionNamespace):
def days(self) -> Expression:
"""Partitioning Transform that returns the number of days since epoch (1970-01-01)
Returns:
Expression: Date Type Expression
"""
return Expression._from_pyexpr(self._expr.partitioning_days())

def hours(self) -> Expression:
"""Partitioning Transform that returns the number of hours since epoch (1970-01-01)
Returns:
Expression: Int32 Expression in hours
"""
return Expression._from_pyexpr(self._expr.partitioning_hours())

def months(self) -> Expression:
"""Partitioning Transform that returns the number of months since epoch (1970-01-01)
Returns:
Expression: Int32 Expression in months
"""

return Expression._from_pyexpr(self._expr.partitioning_months())

def years(self) -> Expression:
"""Partitioning Transform that returns the number of years since epoch (1970-01-01)
Returns:
Expression: Int32 Expression in years
"""

return Expression._from_pyexpr(self._expr.partitioning_years())

def iceberg_bucket(self, n: int) -> Expression:
"""Partitioning Transform that returns the Hash Bucket following the Iceberg Specification of murmur3_32_x86
https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
Args:
n (int): Number of buckets
Returns:
Expression: Int32 Expression with the Hash Bucket
"""
return Expression._from_pyexpr(self._expr.partitioning_iceberg_bucket(n))

def iceberg_truncate(self, w: int) -> Expression:
"""Partitioning Transform that truncates the input to a standard width `w` following the Iceberg Specification.
https://iceberg.apache.org/spec/#truncate-transform-details
Args:
w (int): width of the truncation
Returns:
Expression: Expression of the Same Type of the input
"""
return Expression._from_pyexpr(self._expr.partitioning_iceberg_truncate(w))
3 changes: 3 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ def not_null(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.not_null())

def _to_str_values(self) -> Series:
return Series._from_pyseries(self._series.to_str_values())

@property
def float(self) -> SeriesFloatNamespace:
return SeriesFloatNamespace.from_series(self)
Expand Down
7 changes: 7 additions & 0 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ def partition_by_random(self, num_partitions: int, seed: int) -> list[MicroParti
for t in self._micropartition.partition_by_random(num_partitions, seed)
]

def partition_by_value(self, partition_keys: ExpressionsProjection) -> tuple[list[MicroPartition], MicroPartition]:
exprs = [e._expr for e in partition_keys]
pytables, values = self._micropartition.partition_by_value(exprs)
return [MicroPartition._from_pymicropartition(t) for t in pytables], MicroPartition._from_pymicropartition(
values
)

###
# Compute methods (MicroPartition -> Series)
###
Expand Down
6 changes: 6 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ def partition_by_random(self, num_partitions: int, seed: int) -> list[Table]:

return [Table._from_pytable(t) for t in self._table.partition_by_random(num_partitions, seed)]

def partition_by_value(self, partition_keys: ExpressionsProjection) -> tuple[list[Table], Table]:
exprs = [e._expr for e in partition_keys]
pytables, values = self._table.partition_by_value(exprs)

return [Table._from_pytable(t) for t in pytables], Table._from_pytable(values)

###
# Compute methods (Table -> Series)
###
Expand Down
Loading

0 comments on commit 966145b

Please sign in to comment.