-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-6863][VL] Pre-alloc and reuse compress buffer to avoid OOM in spill #6869
Conversation
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Should we allocate the buffer using global allocator which is counted into overhead memory? |
Run Gluten Clickhouse CI |
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get()); | ||
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>(); | ||
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor | ||
// memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FelixYBW arrow::default_memory_pool is used to allocate the sort buffer and compress buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhztheplayer Can you help look at here? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a function defaultArrowMemoryPool
to VeloxMemoryManager
to unify the memory pool usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code looks fine now as we don't have a mechanism to count global allocation of Arrow into Spark overhead memory.
In future we may report both Arrow and Velox's global pool usages to one counter which requires for some designs. So far we don't have that.
@jinchengchenghh can you take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Add some comments.
@@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) { | |||
return arrow::Status::OK(); | |||
} | |||
|
|||
arrow::Status LocalPartitionWriter::evict( | |||
arrow::Status LocalPartitionWriter::hashEvict( | |||
uint32_t partitionId, | |||
std::unique_ptr<InMemoryPayload> inMemoryPayload, | |||
Evict::type evictType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need evictType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashEvict need this param to know whether the evict source is a spill or not. If it's spill, the partition writer will write the payload to disk immediately, otherwise it will cache the payload.
"Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")")); | ||
output = const_cast<uint8_t*>(compressed->data()); | ||
} else { | ||
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse the buffer for uncompressed payload type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We hold the original evicted buffer for uncompressed payload. There are no extra copy.
@@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() { | |||
return getBufferSize(buffers_); | |||
} | |||
|
|||
int64_t BlockPayload::maxCompressedLength( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move it to anonymous namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a public api for BlockPayload and is used by other components.
@@ -314,7 +314,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() { | |||
uint32_t numRows; | |||
GLUTEN_ASSIGN_OR_THROW( | |||
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); | |||
if (numRows == 0) { | |||
if (arrowBuffers.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR, numRows is set to zero in BlockPayload::deserialize
once reach EOS. This PR remove this logic and use numRows = 0 to represent a segment of a large row that cannot be compressed within one block.
cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer)); | ||
cachedRows_ += numRows; | ||
} else { | ||
// For a large row, read all segments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a bit more? I don't catch the context here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments here to indicate this cases only occurs in sort buffer writer, and the numRows is 0. Do we have a more friendly way to specify the large row that is splited?
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get()); | ||
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>(); | ||
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor | ||
// memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhztheplayer Can you help look at here? Thanks!
@@ -266,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { | |||
} | |||
|
|||
arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { | |||
VELOX_CHECK(begin < end); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VELOX_DCHECK
for (auto useRadixSort : {true, false}) { | ||
params.push_back(ShuffleTestParams{ | ||
ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort}); | ||
for (const auto compressionBufferSize : {4, 56, 32 * 1024}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have the test for split large row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The condition of splitting a large row is the row size > compressionBufferSize. When compressionBufferSize is 4, most of the rows will be split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to support this case or require the compression buffer should be larger than one row size at least, throw exception? I think we should have a check for the minimum config value. @FelixYBW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesn't throw exception. It copies the row to a default 32k buffer for compressing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, It's fine to align with Spark behavior here.
if ("lz4" == codec) { | ||
Math.max( | ||
conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt, | ||
GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we support set the config? GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the default value 64 is much less than other compression kind default value 32 * 1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
64 is not the default value, unless user set IO_COMPRESSION_LZ4_BLOCKSIZE
to a very small size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WE could set a more reasonable value, maybe 32 * 1024?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion, we will throw exception if IO_COMPRESSION_LZ4_BLOCKSIZE
< 4. For each serialized row, the row size takes 4 bytes. Therefore 4 bytes is the minimum acceptable compression block size in Gluten.
Note here for the Spark exceptions:
- lz4: spark.io.compression.lz4.blockSize=0
Caused by: java.lang.IllegalArgumentException: blockSize must be >= 64, got 0
at net.jpountz.lz4.LZ4BlockOutputStream.compressionLevel(LZ4BlockOutputStream.java:60)
at net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:101)
at org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:151)
at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$2(TorrentBroadcast.scala:361)
at scala.Option.map(Option.scala:230)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:361)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:161)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1662)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1644)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1585)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1402)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1337)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
- zstd: spark.io.compression.zstd.bufferSize=0
Caused by: java.lang.IllegalArgumentException: Buffer size <= 0
at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:74)
at org.apache.spark.io.ZStdCompressionCodec.compressedOutputStream(CompressionCodec.scala:237)
at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$2(TorrentBroadcast.scala:361)
at scala.Option.map(Option.scala:230)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:361)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:161)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1662)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1644)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1585)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1402)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1337)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Run Gluten Clickhouse CI |
@jinchengchenghh Do you have further comments? Thanks! |
cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer)); | ||
cachedRows_ += numRows; | ||
} else { | ||
// numRows = 0 indicates a segment of a large row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract the numRows = 0 logic to a function to make code more readable?
RowSizeType bytes = 0; | ||
auto* dst = rowBuffer->mutable_data(); | ||
for (const auto& buffer : buffers) { | ||
VELOX_CHECK_NOT_NULL(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VELOX_DCHECK, code logic should use DCHECK
Run Gluten Clickhouse CI |
@jinchengchenghh Do you have further comments? Thanks! |
Thanks! |
During sort-shuffle spill, allocating compressed buffer can trigger another spill and lead to OOM. Because sortBuffer has fixed-size, the maximum compressed buffer size can be computed at the very begining, and the compressed buffer can be pre-allocated and reused for spill.
spark.io.compression.lz4.blockSize
andspark.io.compression.zstd.bufferSize
to align with spark. Allocate the sort buffer and compress buffer using the default memory pool as Spark counts this part of allocation into memory overhead.