Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] [Delta Lake] Add IO multithreading arg to daft.read_delta_lake(). #2029

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion daft/io/_delta_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def read_delta_lake(
table: Union[str, DataCatalogTable],
io_config: Optional["IOConfig"] = None,
_multithreaded_io: Optional[bool] = None,
) -> DataFrame:
"""Create a DataFrame from a Delta Lake table.

Expand All @@ -34,6 +35,9 @@ def read_delta_lake(
table: Either a URI for the Delta Lake table or a :class:`~daft.io.catalog.DataCatalogTable` instance
referencing a table in a data catalog, such as AWS Glue Data Catalog or Databricks Unity Catalog.
io_config: A custom :class:`~daft.daft.IOConfig` to use when accessing Delta Lake object storage data. Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Defaults to None, which will let Daft decide based on the runner it is currently using.

Returns:
DataFrame: A DataFrame with the schema converted from the specified Delta Lake table.
Expand All @@ -42,7 +46,9 @@ def read_delta_lake(

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

multithreaded_io = not context.get_context().is_ray_runner
# If running on Ray, we want to limit the amount of concurrency and requests being made.
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))

if isinstance(table, str):
Expand Down
Loading