From 75c2caa514ab98d62dec90f27cc83828246947a2 Mon Sep 17 00:00:00 2001 From: Scott Young Date: Tue, 3 Dec 2024 22:20:21 -0800 Subject: [PATCH] add executorBarrier to nimble parallel writer (#102) Summary: adding executor barrier for the fieldwriter for parallelism Differential Revision: D64775045 --- dwio/nimble/velox/FieldWriter.cpp | 83 ++++++++++++++----- dwio/nimble/velox/FieldWriter.h | 5 +- dwio/nimble/velox/VeloxWriter.cpp | 10 ++- dwio/nimble/velox/VeloxWriterOptions.h | 2 + dwio/nimble/velox/tests/VeloxReaderTests.cpp | 87 +++++++++++++++++++- 5 files changed, 161 insertions(+), 26 deletions(-) diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 437d481..bfd0fd5 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -20,6 +20,7 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" #include "velox/common/base/CompareFlags.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" @@ -284,8 +285,10 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_{context.createNullableContentStreamData( typeBuilder_->asScalar().scalarDescriptor())} {} - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { auto size = ranges.size(); auto& buffer = context_.stringBuffer(); auto& data = valuesStream_.mutableData(); @@ -375,8 +378,10 @@ class RowFieldWriter : public FieldWriter { } } - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { auto size = ranges.size(); OrderedRanges childRanges; const OrderedRanges* childRangesPtr; @@ -409,8 +414,19 @@ class RowFieldWriter : public FieldWriter { Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); } - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); + + if (barrier) { + for (auto i = 0; i < fields_.size(); ++i) { + barrier->add([&field = fields_[i], + &childVector = row->childAt(i), + childRanges = *childRangesPtr]() { + field->write(childVector, childRanges); + }); + } + } else { + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); + } } } @@ -510,8 +526,10 @@ class ArrayFieldWriter : public MultiValueFieldWriter { typeBuilder_->asArray().setChildren(elements_->typeBuilder()); } - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { OrderedRanges childRanges; auto array = ingestLengths(vector, ranges, childRanges); if (childRanges.size() > 0) { @@ -550,8 +568,10 @@ class MapFieldWriter : public MultiValueFieldWriter { keys_->typeBuilder(), values_->typeBuilder()); } - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { OrderedRanges childRanges; auto map = ingestLengths(vector, ranges, childRanges); if (childRanges.size() > 0) { @@ -598,8 +618,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter { type->type(), 1, context.bufferMemoryPool.get()); } - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { OrderedRanges childFilteredRanges; auto map = ingestOffsetsAndLengthsDeduplicated( vector, ranges, childFilteredRanges); @@ -731,7 +753,10 @@ class FlatMapPassthroughValueFieldWriter { : valueField_{std::move(valueField)}, inMapStream_{context.createContentStreamData(inMapDescriptor)} {} - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) { auto& data = inMapStream_.mutableData(); data.resize(data.size() + ranges.size(), true); valueField_->write(vector, ranges); @@ -824,11 +849,14 @@ class FlatMapFieldWriter : public FieldWriter { typeBuilder_->asFlatMap().nullsDescriptor())}, valueType_{type->childAt(1)} {} - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { // Check if the vector received is already flattened const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW; - isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges); + isFlatMap ? ingestFlattenedMap(vector, ranges) + : ingestMap(vector, ranges, barrier); } FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter( @@ -897,7 +925,10 @@ class FlatMapFieldWriter : public FieldWriter { } } - void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) { + void ingestMap( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) { NIMBLE_ASSERT( currentPassthroughFields_.empty(), "Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported"); @@ -979,8 +1010,15 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); + + if (barrier) { + for (auto& pair : currentValueFields_) { + barrier->add([&]() { pair.second->write(values, nonNullCount); }); + } + } else { + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); + } } } nonNullCount_ += nonNullCount; @@ -1018,6 +1056,7 @@ class FlatMapFieldWriter : public FieldWriter { private: FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) { + std::scoped_lock lock{context_.flatMapSchemaMutex}; auto it = currentValueFields_.find(key); if (it != currentValueFields_.end()) { return it->second; @@ -1129,8 +1168,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { type->type(), 1, context.bufferMemoryPool.get()); } - void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) - override { + void write( + const velox::VectorPtr& vector, + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) override { OrderedRanges childFilteredRanges; const velox::ArrayVector* array; // To unwrap the dictionaryVector we need to cast into ComplexType before diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 04d776b..c9eb43e 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -22,6 +22,7 @@ #include "dwio/nimble/velox/OrderedRanges.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -92,6 +93,7 @@ struct FieldWriterContext { } std::shared_ptr bufferMemoryPool; + std::mutex flatMapSchemaMutex; SchemaBuilder schemaBuilder; folly::F14FastSet flatMapNodeIds; @@ -166,7 +168,8 @@ class FieldWriter { // Writes the vector to internal buffers. virtual void write( const velox::VectorPtr& vector, - const OrderedRanges& ranges) = 0; + const OrderedRanges& ranges, + velox::dwio::common::ExecutorBarrier* barrier = nullptr) = 0; // Clears interanl state and any accumulated data in internal buffers. virtual void reset() = 0; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0c5ea32..c74a2e6 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -505,7 +505,15 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { NIMBLE_CHECK(file_, "Writer is already closed"); try { auto size = vector->size(); - root_->write(vector, OrderedRanges::of(0, size)); + if (context_->options.writeExecutor) { + velox::dwio::common::ExecutorBarrier barrier{ + context_->options.writeExecutor}; + auto* barrierPtr = &barrier; + root_->write(vector, OrderedRanges::of(0, size), barrierPtr); + barrier.waitAll(); + } else { + root_->write(vector, OrderedRanges::of(0, size)); + } uint64_t memoryUsed = 0; for (const auto& stream : context_->streams()) { diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 8e4386a..f4c0ecf 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -129,6 +129,8 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; + // If provided, internal write operations will happen in parallel + std::shared_ptr writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 72fee6a..8a691b9 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2432,6 +2432,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { if (parallelismFactor > 0) { writerOptions.encodingExecutor = std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); } for (auto i = 0; i < iterations; ++i) { @@ -2517,9 +2519,88 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - writerOptions.encodingExecutor = parallelismFactor > 0 - ? std::make_shared(parallelismFactor) - : nullptr; + if (parallelismFactor > 0) { + writerOptions.encodingExecutor = + std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } + + for (auto i = 0; i < iterations; ++i) { + writeAndVerify( + rng, + *leafPool_.get(), + rowType, + [&](auto& type) { return noNulls.fuzzInputRow(type); }, + vectorEquals, + batches, + writerOptions); + writeAndVerify( + rng, + *leafPool_, + rowType, + [&](auto& type) { return hasNulls.fuzzInputRow(type); }, + vectorEquals, + batches, + writerOptions); + } + } +} + +TEST_F(VeloxReaderTests, TestWriteExecutorDeadLock) { + auto type = velox::ROW( + {{"nested_row", + velox::ROW( + {{"nested_nested_row", + velox::ROW( + {{"nested_nested_nested_row", + velox::ROW({{"a", velox::INTEGER()}})}, + {"b", velox::INTEGER()}})}})}, + {"map", + velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}}); + auto rowType = std::dynamic_pointer_cast(type); + uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + + nimble::VeloxWriterOptions writerOptions; + // Small batches creates more edge cases. + size_t batchSize = 10; + velox::VectorFuzzer noNulls( + { + .vectorSize = batchSize, + .nullRatio = 0, + .stringLength = 20, + .stringVariableLength = true, + .containerLength = 5, + .containerVariableLength = true, + }, + leafPool_.get(), + seed); + + velox::VectorFuzzer hasNulls{ + { + .vectorSize = batchSize, + .nullRatio = 0.05, + .stringLength = 10, + .stringVariableLength = true, + .containerLength = 5, + .containerVariableLength = true, + }, + leafPool_.get(), + seed}; + + auto iterations = 20; + auto batches = 20; + std::mt19937 rng{seed}; + + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { + LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + if (parallelismFactor > 0) { + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } for (auto i = 0; i < iterations; ++i) { writeAndVerify(