Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6863][VL] Pre-alloc and reuse compress buffer to avoid OOM in spill #6869

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
jint compressionBufferSize,
jint compressionThreshold,
jstring compressionModeJstr,
jint sortBufferInitialSize,
Expand Down Expand Up @@ -804,6 +805,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.startPartitionId = startPartitionId,
.shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)),
.sortBufferInitialSize = sortBufferInitialSize,
.compressionBufferSize = compressionBufferSize,
.useRadixSort = static_cast<bool>(useRadixSort)};

// Build PartitionWriterOptions.
Expand Down
92 changes: 46 additions & 46 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,15 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
return spilledFileDir;
}

arrow::Status LocalPartitionWriter::openDataFile() {
// open data file output stream
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> LocalPartitionWriter::openFile(const std::string& file) {
std::shared_ptr<arrow::io::FileOutputStream> fout;
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_));
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(file));
if (options_.bufferedWrite) {
// Output stream buffer is neither partition buffer memory nor ipc memory.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout));
} else {
dataFileOs_ = fout;
// The 16k bytes is a temporary allocation and will be freed with file close.
// Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill.
return arrow::io::BufferedOutputStream::Create(16384, arrow::default_memory_pool(), fout);
}
return arrow::Status::OK();
return fout;
}

arrow::Status LocalPartitionWriter::clearResource() {
Expand Down Expand Up @@ -467,9 +465,7 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill(true));
// Open final data file.
// If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
RETURN_NOT_OK(openDataFile());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));

int64_t endInFinalFile = 0;
DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size();
Expand Down Expand Up @@ -523,14 +519,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
std::string spillFile;
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
RETURN_NOT_OK(openDataFile());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
}
spiller_ = std::make_unique<LocalSpiller>(
os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get());
Expand All @@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) {
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need evictType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hashEvict need this param to know whether the evict source is a spill or not. If it's spill, the partition writer will write the payload to disk immediately, otherwise it will cache the payload.

bool reuseBuffers,
bool hasComplexType,
bool isFinal) {
bool hasComplexType) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();

if (evictType == Evict::kSortSpill) {
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

if (evictType == Evict::kSpill) {
RETURN_NOT_OK(requestSpill(false));
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -609,6 +576,40 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();

if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(
payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed)));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

// FIXME: Remove this code path for local partition writer.
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();
Expand Down Expand Up @@ -644,8 +645,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
if (payloadCache_ && payloadCache_->canSpill()) {
auto beforeSpill = payloadPool_->bytes_allocated();
ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(
spills_.back(),
Expand Down
11 changes: 8 additions & 3 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ class LocalPartitionWriter : public PartitionWriter {
const std::string& dataFile,
const std::vector<std::string>& localDirs);

arrow::Status evict(
arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType,
bool hasComplexType) override;

arrow::Status sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) override;

arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) override;
Expand Down Expand Up @@ -87,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter {

std::string nextSpilledFileDir();

arrow::Status openDataFile();
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const std::string& file);

arrow::Status mergeSpills(uint32_t partitionId);

Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
Expand Down Expand Up @@ -62,6 +63,7 @@ struct ShuffleWriterOptions {

// Sort shuffle writer.
int32_t sortBufferInitialSize = kDefaultSortBufferSize;
int32_t compressionBufferSize = kDefaultCompressionBufferSize;
bool useRadixSort = kDefaultUseRadixSort;
};

Expand Down
18 changes: 15 additions & 3 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
namespace gluten {

struct Evict {
enum type { kCache, kSpill, kSortSpill };
enum type { kCache, kSpill };
};

class PartitionWriter : public Reclaimable {
Expand All @@ -42,14 +42,26 @@ class PartitionWriter : public Reclaimable {
virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;

/// Evict buffers for `partitionId` partition.
virtual arrow::Status evict(
virtual arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType,
bool hasComplexType) = 0;

virtual arrow::Status sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) = 0;

std::optional<int64_t> getCompressedBufferLength(const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
if (!codec_) {
return std::nullopt;
}
return BlockPayload::maxCompressedLength(buffers, codec_.get());
}

virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;

uint64_t cachedPayloadSize() {
Expand Down
73 changes: 46 additions & 27 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,45 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec) {
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed) {
if (payloadType == Payload::Type::kCompressed) {
Timer compressionTime;
compressionTime.start();
// Compress.
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength =
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
});
const auto maxCompressedLength = metadataLength + totalCompressedLength;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::ResizableBuffer> compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool));

auto output = compressed->mutable_data();
auto maxLength = maxCompressedLength(buffers, codec);
std::shared_ptr<arrow::Buffer> compressedBuffer;
uint8_t* output;
if (compressed) {
ARROW_RETURN_IF(
compressed->size() < maxLength,
arrow::Status::Invalid(
"Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")"));
output = const_cast<uint8_t*>(compressed->data());
} else {
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the buffer for uncompressed payload type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hold the original evicted buffer for uncompressed payload. There are no extra copy.

output = compressedBuffer->mutable_data();
}

int64_t actualLength = 0;
// Compress buffers one by one.
for (auto& buffer : buffers) {
auto availableLength = maxCompressedLength - actualLength;
auto availableLength = maxLength - actualLength;
// Release buffer after compression.
ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec));
output += compressedSize;
actualLength += compressedSize;
}

ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound."));
RETURN_NOT_OK(compressed->Resize(actualLength));
if (compressed) {
compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(), actualLength);
} else {
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
}
compressionTime.stop();
auto payload = std::unique_ptr<BlockPayload>(new BlockPayload(
Type::kCompressed,
numRows,
std::vector<std::shared_ptr<arrow::Buffer>>{compressed},
isValidityBuffer,
pool,
codec));
auto payload = std::unique_ptr<BlockPayload>(
new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec));
payload->setCompressionTime(compressionTime.realTimeUsed());
return payload;
}
Expand Down Expand Up @@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() {
return getBufferSize(buffers_);
}

int64_t BlockPayload::maxCompressedLength(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to anonymous namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a public api for BlockPayload and is used by other components.

const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec) {
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength =
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
});
return metadataLength + totalCompressedLength;
}

arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
std::unique_ptr<InMemoryPayload> source,
std::unique_ptr<InMemoryPayload> append,
Expand Down Expand Up @@ -404,9 +419,13 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, std::move(merged));
}

arrow::Result<std::unique_ptr<BlockPayload>>
InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) {
return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec);
arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(
Payload::Type payloadType,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed) {
return BlockPayload::fromBuffers(
payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed));
}

arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) {
Expand Down
14 changes: 11 additions & 3 deletions cpp/core/shuffle/Payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class BlockPayload final : public Payload {
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec);
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed);

static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> deserialize(
arrow::io::InputStream* inputStream,
Expand All @@ -93,6 +94,10 @@ class BlockPayload final : public Payload {
uint32_t& numRows,
int64_t& decompressTime);

static int64_t maxCompressedLength(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec);

arrow::Status serialize(arrow::io::OutputStream* outputStream) override;

arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos) override;
Expand Down Expand Up @@ -131,8 +136,11 @@ class InMemoryPayload final : public Payload {

arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) override;

arrow::Result<std::unique_ptr<BlockPayload>>
toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec);
arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
Payload::Type payloadType,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed = nullptr);

arrow::Status copyBuffers(arrow::MemoryPool* pool);

Expand Down
Loading
Loading