From b7f8c6ed35f3c431480cb9156147d161c7504b05 Mon Sep 17 00:00:00 2001 From: Scott Young Date: Tue, 3 Dec 2024 10:14:52 -0800 Subject: [PATCH] add executorBarrier to nimble parallel writer (#102) Summary: adding executor barrier for the fieldwriter for parallelism Differential Revision: D64775045 --- dwio/nimble/velox/FieldWriter.cpp | 34 +++++++- dwio/nimble/velox/FieldWriter.h | 9 ++ dwio/nimble/velox/VeloxWriter.cpp | 5 +- dwio/nimble/velox/VeloxWriterOptions.h | 2 + dwio/nimble/velox/tests/VeloxReaderTests.cpp | 87 +++++++++++++++++++- 5 files changed, 129 insertions(+), 8 deletions(-) diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index cb973b4..14c501e 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -20,6 +20,7 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" #include "velox/common/base/CompareFlags.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" @@ -410,8 +411,25 @@ class RowFieldWriter : public FieldWriter { Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); } - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); + + if (context_.barrier) { + for (auto i = 0; i < fields_.size(); ++i) { + const auto& kind = fields_[i]->typeBuilder()->kind(); + if (kind == Kind::FlatMap) { + // if flatmap handle within due to fieldvaluewriter creation + fields_[i]->write(row->childAt(i), *childRangesPtr); + } else { + context_.barrier->add([&field = fields_[i], + &rowItem = row->childAt(i), + childRanges = *childRangesPtr]() { + field->write(rowItem, childRanges); + }); + } + } + } else { + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); + } } } @@ -980,8 +998,16 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); + + if (context_.barrier) { + for (auto& pair : currentValueFields_) { + context_.barrier->add( + [&]() { pair.second->write(values, nonNullCount); }); + } + } else { + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); + } } } nonNullCount_ += nonNullCount; diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 4575a4c..cd608b6 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -22,6 +22,7 @@ #include "dwio/nimble/velox/OrderedRanges.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -83,6 +84,7 @@ class DecodingContextPool { struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, + std::shared_ptr executor = nullptr, std::unique_ptr reclaimer = nullptr, std::function vectorDecoderVisitor = []() {}) : bufferMemoryPool{memoryPool.addLeafChild( @@ -92,6 +94,11 @@ struct FieldWriterContext { inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, decodingContextPool_{std::move(vectorDecoderVisitor)} { + if (executor) { + barrier = + std::make_unique(executor); + } + resetStringBuffer(); } @@ -105,6 +112,8 @@ struct FieldWriterContext { std::unique_ptr inputBufferGrowthPolicy; InputBufferGrowthStats inputBufferGrowthStats; + std::unique_ptr barrier; + std::function flatmapFieldAddedEventHandler; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0c5ea32..e18cee3 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, + : FieldWriterContext{memoryPool, options.writeExecutor, options.reclaimerFactory(), options.vectorDecoderVisitor}, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); @@ -506,6 +506,9 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { try { auto size = vector->size(); root_->write(vector, OrderedRanges::of(0, size)); + if (context_->barrier) { + context_->barrier->waitAll(); + } uint64_t memoryUsed = 0; for (const auto& stream : context_->streams()) { diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 8e4386a..f4c0ecf 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -129,6 +129,8 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; + // If provided, internal write operations will happen in parallel + std::shared_ptr writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index b192107..5bd052a 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { if (parallelismFactor > 0) { writerOptions.encodingExecutor = std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); } for (auto i = 0; i < iterations; ++i) { @@ -2519,9 +2521,88 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - writerOptions.encodingExecutor = parallelismFactor > 0 - ? std::make_shared(parallelismFactor) - : nullptr; + if (parallelismFactor > 0) { + writerOptions.encodingExecutor = + std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } + + for (auto i = 0; i < iterations; ++i) { + writeAndVerify( + rng, + *leafPool_.get(), + rowType, + [&](auto& type) { return noNulls.fuzzInputRow(type); }, + vectorEquals, + batches, + writerOptions); + writeAndVerify( + rng, + *leafPool_, + rowType, + [&](auto& type) { return hasNulls.fuzzInputRow(type); }, + vectorEquals, + batches, + writerOptions); + } + } +} + +TEST_F(VeloxReaderTests, TestWriteExecutorDeadLock) { + auto type = velox::ROW( + {{"nested_row", + velox::ROW( + {{"nested_nested_row", + velox::ROW( + {{"nested_nested_nested_row", + velox::ROW({{"a", velox::INTEGER()}})}, + {"b", velox::INTEGER()}})}})}, + {"map", + velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}}); + auto rowType = std::dynamic_pointer_cast(type); + uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + + nimble::VeloxWriterOptions writerOptions; + // Small batches creates more edge cases. + size_t batchSize = 10; + velox::VectorFuzzer noNulls( + { + .vectorSize = batchSize, + .nullRatio = 0, + .stringLength = 20, + .stringVariableLength = true, + .containerLength = 5, + .containerVariableLength = true, + }, + leafPool_.get(), + seed); + + velox::VectorFuzzer hasNulls{ + { + .vectorSize = batchSize, + .nullRatio = 0.05, + .stringLength = 10, + .stringVariableLength = true, + .containerLength = 5, + .containerVariableLength = true, + }, + leafPool_.get(), + seed}; + + auto iterations = 20; + auto batches = 20; + std::mt19937 rng{seed}; + + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { + LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + if (parallelismFactor > 0) { + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } for (auto i = 0; i < iterations; ++i) { writeAndVerify(