From 15fbb6f024a302a51ebbade7cce5e30ab8df92f9 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 28 Jun 2024 13:48:11 -0700 Subject: [PATCH] use exponential backoff retry --- daft/context.py | 3 --- daft/daft.pyi | 3 --- daft/table/table_io.py | 22 ++++++++++++++++------ src/common/daft-config/src/lib.rs | 2 -- src/common/daft-config/src/python.rs | 9 --------- 5 files changed, 16 insertions(+), 23 deletions(-) diff --git a/daft/context.py b/daft/context.py index b94457a788..aa98ff23fc 100644 --- a/daft/context.py +++ b/daft/context.py @@ -284,7 +284,6 @@ def set_execution_config( csv_inflation_factor: float | None = None, shuffle_aggregation_default_partitions: int | None = None, read_sql_partition_size_bytes: int | None = None, - write_partition_num_retries: int | None = None, enable_aqe: bool | None = None, enable_native_executor: bool | None = None, ) -> DaftContext: @@ -317,7 +316,6 @@ def set_execution_config( csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200. read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB - write_partition_num_retries: Number of times to retry writing out each MicroPartition. Defaults to 3 enable_aqe: Enables Adaptive Query Execution, Defaults to False enable_native_executor: Enables new local executor. Defaults to False """ @@ -341,7 +339,6 @@ def set_execution_config( csv_inflation_factor=csv_inflation_factor, shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, read_sql_partition_size_bytes=read_sql_partition_size_bytes, - write_partition_num_retries=write_partition_num_retries, enable_aqe=enable_aqe, enable_native_executor=enable_native_executor, ) diff --git a/daft/daft.pyi b/daft/daft.pyi index 92fb580dfc..29b62fdafe 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1605,7 +1605,6 @@ class PyDaftExecutionConfig: csv_inflation_factor: float | None = None, shuffle_aggregation_default_partitions: int | None = None, read_sql_partition_size_bytes: int | None = None, - write_partition_num_retries: int | None = None, enable_aqe: bool | None = None, enable_native_executor: bool | None = None, ) -> PyDaftExecutionConfig: ... @@ -1636,8 +1635,6 @@ class PyDaftExecutionConfig: @property def read_sql_partition_size_bytes(self) -> int: ... @property - def write_partition_num_retries(self) -> int: ... - @property def enable_aqe(self) -> bool: ... @property def enable_native_executor(self) -> bool: ... diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 12ff0e3ede..39feec30b2 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -3,6 +3,8 @@ import contextlib import math import pathlib +import random +import time from collections.abc import Callable, Generator from typing import IO, TYPE_CHECKING, Any, Union from uuid import uuid4 @@ -788,9 +790,11 @@ def _write_tabular_arrow_table( else: basename_template = f"{uuid4()}-{{i}}.{format.default_extname}" - num_retries = get_context().daft_execution_config.write_partition_num_retries + NUM_TRIES = 3 + JITTER_MS = 2_500 + MAX_BACKOFF_MS = 20_000 - for _ in range(num_retries): + for attempt in range(NUM_TRIES): try: pads.write_dataset( arrow_table, @@ -807,7 +811,13 @@ def _write_tabular_arrow_table( **kwargs, ) break - except Exception as e: - error = e - else: - raise OSError(f"Failed to retry write to {full_path}") from error + except OSError as e: + if "InvalidPart" not in str(e): + raise + + if attempt == NUM_TRIES - 1: + raise OSError(f"Failed to retry write to {full_path}") from e + else: + jitter = random.randint(0, (2**attempt) * JITTER_MS) + backoff = min(MAX_BACKOFF_MS, jitter) + time.sleep(backoff / 1000) diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 1f020cff1c..ee87b38e7d 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -40,7 +40,6 @@ pub struct DaftExecutionConfig { pub csv_inflation_factor: f64, pub shuffle_aggregation_default_partitions: usize, pub read_sql_partition_size_bytes: usize, - pub write_partition_num_retries: usize, pub enable_aqe: bool, pub enable_native_executor: bool, } @@ -62,7 +61,6 @@ impl Default for DaftExecutionConfig { csv_inflation_factor: 0.5, shuffle_aggregation_default_partitions: 200, read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB - write_partition_num_retries: 3, enable_aqe: false, enable_native_executor: false, } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 34965a847a..31a7f83803 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -99,7 +99,6 @@ impl PyDaftExecutionConfig { csv_inflation_factor: Option, shuffle_aggregation_default_partitions: Option, read_sql_partition_size_bytes: Option, - write_partition_num_retries: Option, enable_aqe: Option, enable_native_executor: Option, ) -> PyResult { @@ -151,9 +150,6 @@ impl PyDaftExecutionConfig { if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes { config.read_sql_partition_size_bytes = read_sql_partition_size_bytes; } - if let Some(table_write_num_retires) = write_partition_num_retries { - config.write_partition_num_retries = table_write_num_retires; - } if let Some(enable_aqe) = enable_aqe { config.enable_aqe = enable_aqe; @@ -232,11 +228,6 @@ impl PyDaftExecutionConfig { Ok(self.config.read_sql_partition_size_bytes) } - #[getter] - fn get_write_partition_num_retries(&self) -> PyResult { - Ok(self.config.write_partition_num_retries) - } - #[getter] fn enable_aqe(&self) -> PyResult { Ok(self.config.enable_aqe)