Skip to content

Commit

Permalink
Revert behavior to use .into_partitions for parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed May 9, 2023
1 parent 8516cc6 commit a06be60
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,9 +893,23 @@ def read_binary_files(
assert ray.is_initialized(), "Ray should be initialized by Ludwig already at application start"
daft.context.set_runner_ray(address="auto", noop_if_initialized=True)

# Create 1 partition for each available CPU in the Ray cluster
# Fall back to distributing this over 1 GPU if no CPUs are available
# This is a heuristic to saturate network bandwidth while downloading images
# and also prevent large disk spillage to disk from the object store since we will
# only spill smaller individual partitions rather than 1 large partition
resources = self.get_available_resources()
if resources.cpus > 0:
num_downloading_partitions = int(resources.cpus)
else:
num_downloading_partitions = 1

with self.storage.cache.use_credentials():
df = daft.from_dask_dataframe(column.to_frame(name=column.name))

original_num_partitions = df.num_partitions()
df = df.into_partitions(num_downloading_partitions)

# Download binary files in parallel
df = df.with_column(
column.name,
Expand All @@ -910,9 +924,11 @@ def read_binary_files(
if map_fn is not None:
df = df.with_column(column.name, df[column.name].apply(map_fn, return_dtype=daft.DataType.python()))

# Executes and convert Daft Dataframe to Dask DataFrame - note that this preserves partitioning
df = df.to_dask_dataframe()
df = self.df_engine.persist(df)
df = df.into_partitions(original_num_partitions)

# Executes and convert Daft Dataframe to Dask DataFrame - note that this preserves partitioning
df = df.to_dask_dataframe()
df = self.df_engine.persist(df)
else:
# Assume the path has already been read in, so just convert directly to a dataset
# Name the column "value" to match the behavior of the above
Expand Down

0 comments on commit a06be60

Please sign in to comment.