Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Emit warning if local shuffle buffer would cause spilling #48925

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

bveeramani
Copy link
Member

Why are these changes needed?

Blocks in the local shuffle buffer are memory from object store memory. This means that if you set a large shuffle buffer size, you might encounter spilling or even out-of-disk errors.

To mitigate this issue, this PR makes Ray Data emit a warning if the local shuffle buffer would cause spilling.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>

def _get_total_obj_store_mem_on_node() -> int:
node_id = ray.get_runtime_context().get_node_id()
total_resources_per_node = ray._private.state.total_resources_per_node()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this API requires an RPC. Since this is only called once per iteration, I think the performance should be good enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this API requires an RPC

Why don't we move this into DataContext and cache it there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put it in data/_internal/util.py, as it is a utility function

Comment on lines +242 to +248
warnings.warn(
"The node you're iterating on has "
f"{memory_string(self._total_object_store_nbytes)} object "
"store memory, but the shuffle buffer is estimated to use "
f"{memory_string(self._estimated_max_buffer_nbytes)}. If you don't "
f"decrease the shuffle buffer size from {self._buffer_min_size} rows, "
"you might encounter spilling."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.,

UserWarning: The node you're iterating on has 128.0MB object store memory, but the shuffle buffer is estimated to use 384.0MB. If you don't decrease the shuffle buffer size from 2 rows, you might encounter spilling.

Comment on lines +251 to 255
block_accessor = BlockAccessor.for_block(block)
self._total_rows_added += block_accessor.num_rows()
self._total_nbytes_added += block_accessor.size_bytes()
if block_accessor.num_rows() > 0:
self._builder.add_block(block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not adding the block we'd be changing the counters

Comment on lines +274 to +275
* self._buffer_min_size
* SHUFFLE_BUFFER_COMPACTION_RATIO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract this to a common util


def _get_total_obj_store_mem_on_node() -> int:
node_id = ray.get_runtime_context().get_node_id()
total_resources_per_node = ray._private.state.total_resources_per_node()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this API requires an RPC

Why don't we move this into DataContext and cache it there?


def _get_total_obj_store_mem_on_node() -> int:
node_id = ray.get_runtime_context().get_node_id()
total_resources_per_node = ray._private.state.total_resources_per_node()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put it in data/_internal/util.py, as it is a utility function

# encounter spilling.
if (
self._estimated_max_buffer_nbytes is not None
and self._estimated_max_buffer_nbytes > self._total_object_store_nbytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, 1) this if statement can be put under if BlockAccessor.for_block(block).num_rows() > 0: and after after_block. because this will guarantee that _estimated_max_buffer_nbytes is not None
2) we can skip calculating _estimated_max_buffer_nbytes after it's calculated once.

Comment on lines +233 to +236
# Because Arrow tables are memory mapped, blocks in the local shuffle buffer
# resides in object store memory and not local heap memory. So, if you specify a
# large buffer size and there isn't enough object store memory on the node, you
# encounter spilling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is confusing actually:

  • There's no relation b/w shuffle buffer and the object storage
  • Produced block will likely get into the OS only once it's being yielded from operator (that's using the batcher)

# encounter spilling.
if (
self._estimated_max_buffer_nbytes is not None
and self._estimated_max_buffer_nbytes > self._total_object_store_nbytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani this value should be doubled:

  • One half is blocks before batching
  • Other half is new block produced

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants