diff --git a/ludwig/backend/ray.py b/ludwig/backend/ray.py index 7bd34c6927c..90e8d204405 100644 --- a/ludwig/backend/ray.py +++ b/ludwig/backend/ray.py @@ -893,9 +893,23 @@ def read_binary_files( assert ray.is_initialized(), "Ray should be initialized by Ludwig already at application start" daft.context.set_runner_ray(address="auto", noop_if_initialized=True) + # Create 1 partition for each available CPU in the Ray cluster + # Fall back to distributing this over 1 GPU if no CPUs are available + # This is a heuristic to saturate network bandwidth while downloading images + # and also prevent large disk spillage to disk from the object store since we will + # only spill smaller individual partitions rather than 1 large partition + resources = self.get_available_resources() + if resources.cpus > 0: + num_downloading_partitions = int(resources.cpus) + else: + num_downloading_partitions = 1 + with self.storage.cache.use_credentials(): df = daft.from_dask_dataframe(column.to_frame(name=column.name)) + original_num_partitions = df.num_partitions() + df = df.into_partitions(num_downloading_partitions) + # Download binary files in parallel df = df.with_column( column.name, @@ -910,9 +924,11 @@ def read_binary_files( if map_fn is not None: df = df.with_column(column.name, df[column.name].apply(map_fn, return_dtype=daft.DataType.python())) - # Executes and convert Daft Dataframe to Dask DataFrame - note that this preserves partitioning - df = df.to_dask_dataframe() - df = self.df_engine.persist(df) + df = df.into_partitions(original_num_partitions) + + # Executes and convert Daft Dataframe to Dask DataFrame - note that this preserves partitioning + df = df.to_dask_dataframe() + df = self.df_engine.persist(df) else: # Assume the path has already been read in, so just convert directly to a dataset # Name the column "value" to match the behavior of the above