Skip to content

Commit

Permalink
add retires to pyarrow write_dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Jun 28, 2024
1 parent fa88023 commit 30e80b8
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def set_execution_config(
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
write_partition_num_retries: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
) -> DaftContext:
Expand Down Expand Up @@ -316,6 +317,7 @@ def set_execution_config(
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
write_partition_num_retries: Number of times to retry writing out each MicroPartition. Defaults to 3
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
"""
Expand All @@ -339,6 +341,7 @@ def set_execution_config(
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
write_partition_num_retries=write_partition_num_retries,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
)
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,7 @@ class PyDaftExecutionConfig:
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
write_partition_num_retries: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
) -> PyDaftExecutionConfig: ...
Expand Down Expand Up @@ -1635,6 +1636,8 @@ class PyDaftExecutionConfig:
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def write_partition_num_retries(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
@property
def enable_native_executor(self) -> bool: ...
Expand Down
12 changes: 6 additions & 6 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,9 @@ def _write_tabular_arrow_table(
else:
basename_template = f"{uuid4()}-{{i}}.{format.default_extname}"

errors: list[None | Exception] = [None, None, None]
for i in range(3):
num_retries = get_context().daft_execution_config.write_partition_num_retries

for _ in range(num_retries):
try:
pads.write_dataset(
arrow_table,
Expand All @@ -807,7 +808,6 @@ def _write_tabular_arrow_table(
)
break
except Exception as e:
errors[i] = e

if any(errors):
raise Exception(f"Errors while writing: \n\t{errors[0]}\n\t{errors[1]}\n\t{errors[2]}")
error = e
else:
raise OSError(f"Failed to retry write to {full_path}") from error
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct DaftExecutionConfig {
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub write_partition_num_retries: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
}
Expand All @@ -61,6 +62,7 @@ impl Default for DaftExecutionConfig {
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
write_partition_num_retries: 3,
enable_aqe: false,
enable_native_executor: false,
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl PyDaftExecutionConfig {
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
write_partition_num_retries: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
) -> PyResult<PyDaftExecutionConfig> {
Expand Down Expand Up @@ -150,6 +151,9 @@ impl PyDaftExecutionConfig {
if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
if let Some(table_write_num_retires) = write_partition_num_retries {
config.write_partition_num_retries = table_write_num_retires;
}

if let Some(enable_aqe) = enable_aqe {
config.enable_aqe = enable_aqe;
Expand Down Expand Up @@ -227,6 +231,12 @@ impl PyDaftExecutionConfig {
fn get_read_sql_partition_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.read_sql_partition_size_bytes)
}

#[getter]
fn get_write_partition_num_retries(&self) -> PyResult<usize> {
Ok(self.config.write_partition_num_retries)
}

#[getter]
fn enable_aqe(&self) -> PyResult<bool> {
Ok(self.config.enable_aqe)
Expand Down

0 comments on commit 30e80b8

Please sign in to comment.