-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Emit warning if local shuffle buffer would cause spilling #48925
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
|
||
def _get_total_obj_store_mem_on_node() -> int: | ||
node_id = ray.get_runtime_context().get_node_id() | ||
total_resources_per_node = ray._private.state.total_resources_per_node() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this API requires an RPC. Since this is only called once per iteration, I think the performance should be good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this API requires an RPC
Why don't we move this into DataContext
and cache it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put it in data/_internal/util.py
, as it is a utility function
warnings.warn( | ||
"The node you're iterating on has " | ||
f"{memory_string(self._total_object_store_nbytes)} object " | ||
"store memory, but the shuffle buffer is estimated to use " | ||
f"{memory_string(self._estimated_max_buffer_nbytes)}. If you don't " | ||
f"decrease the shuffle buffer size from {self._buffer_min_size} rows, " | ||
"you might encounter spilling." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.,
UserWarning: The node you're iterating on has 128.0MB object store memory, but the shuffle buffer is estimated to use 384.0MB. If you don't decrease the shuffle buffer size from 2 rows, you might encounter spilling.
block_accessor = BlockAccessor.for_block(block) | ||
self._total_rows_added += block_accessor.num_rows() | ||
self._total_nbytes_added += block_accessor.size_bytes() | ||
if block_accessor.num_rows() > 0: | ||
self._builder.add_block(block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not adding the block we'd be changing the counters
* self._buffer_min_size | ||
* SHUFFLE_BUFFER_COMPACTION_RATIO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's extract this to a common util
|
||
def _get_total_obj_store_mem_on_node() -> int: | ||
node_id = ray.get_runtime_context().get_node_id() | ||
total_resources_per_node = ray._private.state.total_resources_per_node() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this API requires an RPC
Why don't we move this into DataContext
and cache it there?
|
||
def _get_total_obj_store_mem_on_node() -> int: | ||
node_id = ray.get_runtime_context().get_node_id() | ||
total_resources_per_node = ray._private.state.total_resources_per_node() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put it in data/_internal/util.py
, as it is a utility function
# encounter spilling. | ||
if ( | ||
self._estimated_max_buffer_nbytes is not None | ||
and self._estimated_max_buffer_nbytes > self._total_object_store_nbytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, 1) this if statement can be put under if BlockAccessor.for_block(block).num_rows() > 0:
and after after_block
. because this will guarantee that _estimated_max_buffer_nbytes is not None
2) we can skip calculating _estimated_max_buffer_nbytes
after it's calculated once.
# Because Arrow tables are memory mapped, blocks in the local shuffle buffer | ||
# resides in object store memory and not local heap memory. So, if you specify a | ||
# large buffer size and there isn't enough object store memory on the node, you | ||
# encounter spilling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is confusing actually:
- There's no relation b/w shuffle buffer and the object storage
- Produced block will likely get into the OS only once it's being yielded from operator (that's using the batcher)
# encounter spilling. | ||
if ( | ||
self._estimated_max_buffer_nbytes is not None | ||
and self._estimated_max_buffer_nbytes > self._total_object_store_nbytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bveeramani this value should be doubled:
- One half is blocks before batching
- Other half is new block produced
Why are these changes needed?
Blocks in the local shuffle buffer are memory from object store memory. This means that if you set a large shuffle buffer size, you might encounter spilling or even out-of-disk errors.
To mitigate this issue, this PR makes Ray Data emit a warning if the local shuffle buffer would cause spilling.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.