From 4212fbfa89c781d92f5f9e6d6b872d40c8bf5bb0 Mon Sep 17 00:00:00 2001 From: Scott Young Date: Tue, 5 Nov 2024 13:34:03 -0800 Subject: [PATCH] add executorBarrier to nimble parallel writer (#102) Summary: adding executor barrier for the fieldwriter for parallelism Differential Revision: D64775045 --- dwio/nimble/velox/FieldWriter.cpp | 56 +++++++++++++++++--- dwio/nimble/velox/FieldWriter.h | 5 ++ dwio/nimble/velox/VeloxWriter.cpp | 1 + dwio/nimble/velox/VeloxWriterOptions.h | 2 + dwio/nimble/velox/tests/VeloxReaderTests.cpp | 13 +++-- 5 files changed, 66 insertions(+), 11 deletions(-) diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 9150c25..aceac5b 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -19,6 +19,7 @@ #include "dwio/nimble/velox/DeduplicationUtils.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" +#include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/base/CompareFlags.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" @@ -368,6 +369,11 @@ class RowFieldWriter : public FieldWriter { : FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())}, nullsStream_{context_.createNullsStreamData( typeBuilder_->asRow().nullsDescriptor())} { + if (context.writeExecutor) { + barrier_ = + std::make_unique(std::move(context_.writeExecutor)); + } + auto rowType = std::dynamic_pointer_cast(type->type()); @@ -414,8 +420,27 @@ class RowFieldWriter : public FieldWriter { [&](auto offset) { childRanges.add(offset, 1); }); context_.decodingPairPool().addPair(std::move(pair)); } - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); + + if (barrier_) { + for (auto i = 0; i < fields_.size(); ++i) { + const auto& kind = fields_[i]->typeBuilder()->kind(); + if (kind == Kind::Row || kind == Kind::FlatMap) { + // if row handle now to prevent deadlock + // if flatmap handle within due to fieldvaluewriter creation + fields_[i]->write(row->childAt(i), *childRangesPtr); + } else { + barrier_->add([&field = fields_[i], + &rowItem = row->childAt(i), + &childRanges = *childRangesPtr]() { + field->write(rowItem, childRanges); + }); + } + } + barrier_->waitAll(); + } else { + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); + } } } @@ -434,6 +459,7 @@ class RowFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; std::vector> fields_; NullsStreamData& nullsStream_; }; @@ -800,7 +826,12 @@ class FlatMapFieldWriter : public FieldWriter { NimbleTypeTraits::scalarKind)), nullsStream_{context_.createNullsStreamData( typeBuilder_->asFlatMap().nullsDescriptor())}, - valueType_{type->childAt(1)} {} + valueType_{type->childAt(1)} { + if (context.writeExecutor) { + barrier_ = + std::make_unique(std::move(context.writeExecutor)); + } + } void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { @@ -884,8 +915,16 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); + + if (barrier_) { + for (auto& pair : currentValueFields_) { + barrier_->add([&]() { pair.second->write(values, nonNullCount); }); + } + barrier_->waitAll(); + } else { + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); + } } } nonNullCount_ += nonNullCount; @@ -914,6 +953,7 @@ class FlatMapFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) { auto it = currentValueFields_.find(key); if (it != currentValueFields_.end()) { @@ -926,6 +966,7 @@ class FlatMapFieldWriter : public FieldWriter { // check whether the typebuilder for this key is already present auto flatFieldIt = allValueFields_.find(key); + if (flatFieldIt == allValueFields_.end()) { auto valueFieldWriter = FieldWriter::create(context_, valueType_); const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild( @@ -952,12 +993,13 @@ class FlatMapFieldWriter : public FieldWriter { NullsStreamData& nullsStream_; // This map store the FlatMapValue fields used in current flush unit. - folly::F14FastMap currentValueFields_; + folly::ConcurrentHashMap + currentValueFields_; const std::shared_ptr& valueType_; uint64_t nonNullCount_ = 0; // This map store all FlatMapValue fields encountered by the VeloxWriter // across the whole file. - folly::F14FastMap> + folly::ConcurrentHashMap> allValueFields_; }; diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 55d1d53..919365c 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -23,6 +23,7 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" #include "folly/concurrency/DynamicBoundedQueue.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -35,6 +36,7 @@ struct InputBufferGrowthStats { std::atomic itemCount{0}; }; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; using DecodedVectorPtr = std::unique_ptr; using SelectivityVectorPtr = std::unique_ptr; using DecodingPair = std::pair; @@ -113,6 +115,7 @@ struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, std::unique_ptr reclaimer = nullptr, + std::shared_ptr executor = nullptr, std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), size_t maxPoolSize = std::thread::hardware_concurrency(), size_t initialBufferCount = 10) @@ -120,6 +123,7 @@ struct FieldWriterContext { "field_writer_buffer", true, std::move(reclaimer))}, + writeExecutor{std::move(executor)}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, bufferPool_{std::make_unique( @@ -131,6 +135,7 @@ struct FieldWriterContext { std::make_unique(timeout, maxPoolSize)} {} std::shared_ptr bufferMemoryPool; + std::shared_ptr writeExecutor; SchemaBuilder schemaBuilder; folly::F14FastSet flatMapNodeIds; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0ffa283..4bcf2a6 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -67,6 +67,7 @@ class WriterContext : public FieldWriterContext { : FieldWriterContext{ memoryPool, options.reclaimerFactory(), + options.writeExecutor, options.poolTimeout, options.maxPoolSize, options.initialBufferCount diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index d2d6c8d..cbbe112 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -133,6 +133,8 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; + // If provided, internal write operations will happen in parallel + std::shared_ptr writeExecutor; bool enableChunking = false; }; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 911e3bb..0df1247 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -1738,8 +1738,10 @@ TEST_F(VeloxReaderTests, FuzzSimple) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; if (parallelismFactor > 0) { - writerOptions.encodingExecutor = + auto executor = std::make_shared(parallelismFactor); + writerOptions.encodingExecutor = executor; + writerOptions.writeExecutor = executor; } for (auto i = 0; i < iterations; ++i) { @@ -1826,9 +1828,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - writerOptions.encodingExecutor = parallelismFactor > 0 - ? std::make_shared(parallelismFactor) - : nullptr; + if (parallelismFactor > 0) { + auto executor = + std::make_shared(parallelismFactor); + writerOptions.encodingExecutor = executor; + writerOptions.writeExecutor = executor; + } for (auto i = 0; i < iterations; ++i) { writeAndVerify(