Skip to content

Commit

Permalink
use exponential backoff retry
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Jun 28, 2024
1 parent 30e80b8 commit 15fbb6f
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 23 deletions.
3 changes: 0 additions & 3 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ def set_execution_config(
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
write_partition_num_retries: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
) -> DaftContext:
Expand Down Expand Up @@ -317,7 +316,6 @@ def set_execution_config(
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
write_partition_num_retries: Number of times to retry writing out each MicroPartition. Defaults to 3
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
"""
Expand All @@ -341,7 +339,6 @@ def set_execution_config(
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
write_partition_num_retries=write_partition_num_retries,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
)
Expand Down
3 changes: 0 additions & 3 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,6 @@ class PyDaftExecutionConfig:
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
write_partition_num_retries: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
) -> PyDaftExecutionConfig: ...
Expand Down Expand Up @@ -1636,8 +1635,6 @@ class PyDaftExecutionConfig:
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def write_partition_num_retries(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
@property
def enable_native_executor(self) -> bool: ...
Expand Down
22 changes: 16 additions & 6 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import contextlib
import math
import pathlib
import random
import time
from collections.abc import Callable, Generator
from typing import IO, TYPE_CHECKING, Any, Union
from uuid import uuid4
Expand Down Expand Up @@ -788,9 +790,11 @@ def _write_tabular_arrow_table(
else:
basename_template = f"{uuid4()}-{{i}}.{format.default_extname}"

num_retries = get_context().daft_execution_config.write_partition_num_retries
NUM_TRIES = 3
JITTER_MS = 2_500
MAX_BACKOFF_MS = 20_000

for _ in range(num_retries):
for attempt in range(NUM_TRIES):
try:
pads.write_dataset(
arrow_table,
Expand All @@ -807,7 +811,13 @@ def _write_tabular_arrow_table(
**kwargs,
)
break
except Exception as e:
error = e
else:
raise OSError(f"Failed to retry write to {full_path}") from error
except OSError as e:
if "InvalidPart" not in str(e):
raise

if attempt == NUM_TRIES - 1:
raise OSError(f"Failed to retry write to {full_path}") from e
else:
jitter = random.randint(0, (2**attempt) * JITTER_MS)
backoff = min(MAX_BACKOFF_MS, jitter)
time.sleep(backoff / 1000)
2 changes: 0 additions & 2 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct DaftExecutionConfig {
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub write_partition_num_retries: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
}
Expand All @@ -62,7 +61,6 @@ impl Default for DaftExecutionConfig {
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
write_partition_num_retries: 3,
enable_aqe: false,
enable_native_executor: false,
}
Expand Down
9 changes: 0 additions & 9 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl PyDaftExecutionConfig {
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
write_partition_num_retries: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
) -> PyResult<PyDaftExecutionConfig> {
Expand Down Expand Up @@ -151,9 +150,6 @@ impl PyDaftExecutionConfig {
if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
if let Some(table_write_num_retires) = write_partition_num_retries {
config.write_partition_num_retries = table_write_num_retires;
}

if let Some(enable_aqe) = enable_aqe {
config.enable_aqe = enable_aqe;
Expand Down Expand Up @@ -232,11 +228,6 @@ impl PyDaftExecutionConfig {
Ok(self.config.read_sql_partition_size_bytes)
}

#[getter]
fn get_write_partition_num_retries(&self) -> PyResult<usize> {
Ok(self.config.write_partition_num_retries)
}

#[getter]
fn enable_aqe(&self) -> PyResult<bool> {
Ok(self.config.enable_aqe)
Expand Down

0 comments on commit 15fbb6f

Please sign in to comment.