diff --git a/daft/io/_delta_lake.py b/daft/io/_delta_lake.py index 6d81dc434b..6bf8e764f4 100644 --- a/daft/io/_delta_lake.py +++ b/daft/io/_delta_lake.py @@ -15,6 +15,7 @@ def read_delta_lake( table: Union[str, DataCatalogTable], io_config: Optional["IOConfig"] = None, + _multithreaded_io: Optional[bool] = None, ) -> DataFrame: """Create a DataFrame from a Delta Lake table. @@ -34,6 +35,9 @@ def read_delta_lake( table: Either a URI for the Delta Lake table or a :class:`~daft.io.catalog.DataCatalogTable` instance referencing a table in a data catalog, such as AWS Glue Data Catalog or Databricks Unity Catalog. io_config: A custom :class:`~daft.daft.IOConfig` to use when accessing Delta Lake object storage data. Defaults to None. + _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing + the amount of system resources (number of connections and thread contention) when running in the Ray runner. + Defaults to None, which will let Daft decide based on the runner it is currently using. Returns: DataFrame: A DataFrame with the schema converted from the specified Delta Lake table. @@ -42,7 +46,9 @@ def read_delta_lake( io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config - multithreaded_io = not context.get_context().is_ray_runner + # If running on Ray, we want to limit the amount of concurrency and requests being made. + # This is because each Ray worker process receives its own pool of thread workers and connections + multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) if isinstance(table, str):