diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index a1c3028..437d481 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -27,31 +27,6 @@ namespace facebook::nimble { -class FieldWriterContext::LocalDecodedVector { - public: - explicit LocalDecodedVector(FieldWriterContext& context) - : context_(context), vector_(context_.getDecodedVector()) {} - - LocalDecodedVector(LocalDecodedVector&& other) noexcept - : context_{other.context_}, vector_{std::move(other.vector_)} {} - - LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete; - - ~LocalDecodedVector() { - if (vector_) { - context_.releaseDecodedVector(std::move(vector_)); - } - } - - velox::DecodedVector& get() { - return *vector_; - } - - private: - FieldWriterContext& context_; - std::unique_ptr vector_; -}; - namespace { template @@ -359,8 +334,8 @@ class SimpleFieldWriter : public FieldWriter { }); } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vector, ranges); valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); iterateNonNullValues( ranges, @@ -421,8 +396,8 @@ class RowFieldWriter : public FieldWriter { childRangesPtr = &ranges; } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vector, ranges); row = decoded.base()->as(); NIMBLE_ASSERT(row, "Unexpected vector type"); NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch"); @@ -500,8 +475,8 @@ class MultiValueFieldWriter : public FieldWriter { iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vector, ranges); casted = decoded.base()->as(); NIMBLE_ASSERT(casted, "Unexpected vector type"); offsets = casted->rawOffsets(); @@ -715,8 +690,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter { iterableVector, processMapIndex); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vector, ranges); mapVector = decoded.base()->template as(); NIMBLE_ASSERT(mapVector, "Unexpected vector type"); rawOffsets = mapVector->rawOffsets(); @@ -965,8 +940,8 @@ class FlatMapFieldWriter : public FieldWriter { // Keys are encoded. Decode. iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges); - auto localDecodedKeys = decode(mapKeys, keyRanges); - auto& decodedKeys = localDecodedKeys.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decodedKeys = decodingContext.decode(mapKeys, keyRanges); Decoded keysVector{decodedKeys}; iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) { @@ -990,8 +965,8 @@ class FlatMapFieldWriter : public FieldWriter { processVector(map, Flat{vector}); } else { // Map is encoded. Decode. - auto localDecodedMap = decode(vector, ranges); - auto& decodedMap = localDecodedMap.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decodedMap = decodingContext.decode(vector, ranges); map = decodedMap.base()->template as(); NIMBLE_ASSERT(map, "Unexpected vector type"); offsets = map->rawOffsets(); @@ -1375,8 +1350,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { iterateNonNullIndices(ranges, nonNulls, iterableVector, dedupProc); } else { - auto localDecoded = decode(vectorElements, childRanges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vectorElements, childRanges); /** compare array at index and prevIndex to be equal */ compareConsecutive = [&](velox::vector_size_t index, velox::vector_size_t prevIndex) { @@ -1455,8 +1430,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { ingestLengthsOffsetsByElements( arrayVector, iterableVector, ranges, childRanges, filteredRanges); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto decodingContext = context_.getDecodingContext(); + auto& decoded = decodingContext.decode(vector, ranges); arrayVector = decoded.base()->template as(); NIMBLE_ASSERT(arrayVector, "Unexpected vector type"); rawOffsets = arrayVector->rawOffsets(); @@ -1510,51 +1485,65 @@ std::unique_ptr createArrayWithOffsetsFieldWriter( } // namespace -FieldWriterContext::LocalDecodedVector -FieldWriterContext::getLocalDecodedVector() { - NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing"); - vectorDecoderVisitor(); - return LocalDecodedVector{*this}; +DecodingContextPool::DecodingContext::DecodingContext( + DecodingContextPool& pool, + std::unique_ptr decodedVector, + std::unique_ptr selectivityVector) + : pool_{pool}, + decodedVector_{std::move(decodedVector)}, + selectivityVector_{std::move(selectivityVector)} {} + +DecodingContextPool::DecodingContext::~DecodingContext() { + pool_.addContext(std::move(decodedVector_), std::move(selectivityVector_)); } -velox::SelectivityVector& FieldWriterContext::getSelectivityVector( - velox::vector_size_t size) { - if (LIKELY(selectivity_.get() != nullptr)) { - selectivity_->resize(size); - } else { - selectivity_ = std::make_unique(size); - } - return *selectivity_; +velox::DecodedVector& DecodingContextPool::DecodingContext::decode( + const velox::VectorPtr& vector, + const OrderedRanges& ranges) { + selectivityVector_->resize(vector->size()); + selectivityVector_->clearAll(); + ranges.apply([&](auto offset, auto size) { + selectivityVector_->setValidRange(offset, offset + size, true); + }); + selectivityVector_->updateBounds(); + + decodedVector_->decode(*vector, *selectivityVector_); + return *decodedVector_; } -std::unique_ptr FieldWriterContext::getDecodedVector() { - if (decodedVectorPool_.empty()) { - return std::make_unique(); - } - auto vector = std::move(decodedVectorPool_.back()); - decodedVectorPool_.pop_back(); - return vector; +DecodingContextPool::DecodingContextPool( + std::function vectorDecoderVisitor) + : vectorDecoderVisitor_{std::move(vectorDecoderVisitor)} { + NIMBLE_CHECK(vectorDecoderVisitor_, "vectorDecoderVisitor must be set"); + pool_.reserve(std::thread::hardware_concurrency()); } -void FieldWriterContext::releaseDecodedVector( - std::unique_ptr&& vector) { - decodedVectorPool_.push_back(std::move(vector)); +void DecodingContextPool::addContext( + std::unique_ptr decodedVector, + std::unique_ptr selectivityVector) { + std::scoped_lock lock{mutex_}; + pool_.push_back( + std::pair(std::move(decodedVector), std::move(selectivityVector))); } -FieldWriterContext::LocalDecodedVector FieldWriter::decode( - const velox::VectorPtr& vector, - const OrderedRanges& ranges) { - auto& selectivityVector = context_.getSelectivityVector(vector->size()); - // initialize selectivity vector - selectivityVector.clearAll(); - ranges.apply([&](auto offset, auto size) { - selectivityVector.setValidRange(offset, offset + size, true); - }); - selectivityVector.updateBounds(); +DecodingContextPool::DecodingContext DecodingContextPool::reserveContext() { + vectorDecoderVisitor_(); + + std::scoped_lock lock{mutex_}; + if (pool_.empty()) { + return DecodingContext{ + *this, + std::make_unique(), + std::make_unique()}; + } + + auto pair = std::move(pool_.back()); + pool_.pop_back(); + return DecodingContext{*this, std::move(pair.first), std::move(pair.second)}; +} - auto localDecoded = context_.getLocalDecodedVector(); - localDecoded.get().decode(*vector, selectivityVector); - return localDecoded; +size_t DecodingContextPool::size() const { + return pool_.size(); } std::unique_ptr FieldWriter::create( diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index a5c1405..04d776b 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -34,9 +34,49 @@ struct InputBufferGrowthStats { std::atomic itemCount{0}; }; -struct FieldWriterContext { - class LocalDecodedVector; +// A pool of decoding contexts. Decoding contexts are used to decode a vector +// and its associated selectivity vector. The pool is used to avoid +// repeated allocations of decoding context. +class DecodingContextPool { + public: + class DecodingContext { + public: + explicit DecodingContext( + DecodingContextPool& pool, + std::unique_ptr decodedVector, + std::unique_ptr selectivityVector); + + ~DecodingContext(); + velox::DecodedVector& decode( + const velox::VectorPtr& vector, + const range_helper::OrderedRanges& ranges); + + private: + DecodingContextPool& pool_; + std::unique_ptr decodedVector_; + std::unique_ptr selectivityVector_; + }; + + explicit DecodingContextPool( + std::function vectorDecoderVisitor = []() {}); + + DecodingContext reserveContext(); + size_t size() const; + + private: + std::mutex mutex_; + std::vector, + std::unique_ptr>> + pool_; + std::function vectorDecoderVisitor_; + + void addContext( + std::unique_ptr decodedVector, + std::unique_ptr selectivityVector); +}; +struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, std::unique_ptr reclaimer = nullptr, @@ -47,7 +87,7 @@ struct FieldWriterContext { std::move(reclaimer))}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, - vectorDecoderVisitor(std::move(vectorDecoderVisitor)) { + decodingContextPool_{std::move(vectorDecoderVisitor)} { resetStringBuffer(); } @@ -67,10 +107,9 @@ struct FieldWriterContext { std::function typeAddedHandler = [](const TypeBuilder&) {}; - std::function vectorDecoderVisitor; - - LocalDecodedVector getLocalDecodedVector(); - velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size); + DecodingContextPool::DecodingContext getDecodingContext() { + return decodingContextPool_.reserveContext(); + } Buffer& stringBuffer() { return *buffer_; @@ -108,12 +147,8 @@ struct FieldWriterContext { } private: - std::unique_ptr getDecodedVector(); - void releaseDecodedVector(std::unique_ptr&& vector); - std::unique_ptr buffer_; - std::vector> decodedVectorPool_; - std::unique_ptr selectivity_; + DecodingContextPool decodingContextPool_; std::vector> streams_; }; @@ -156,10 +191,6 @@ class FieldWriter { protected: FieldWriterContext& context_; std::shared_ptr typeBuilder_; - - FieldWriterContext::LocalDecodedVector decode( - const velox::VectorPtr& vector, - const OrderedRanges& ranges); }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/DecodingContextPoolTests.cpp b/dwio/nimble/velox/tests/DecodingContextPoolTests.cpp new file mode 100644 index 0000000..08fccec --- /dev/null +++ b/dwio/nimble/velox/tests/DecodingContextPoolTests.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/velox/FieldWriter.h" +#include "folly/executors/CPUThreadPoolExecutor.h" + +namespace { +using DecodingContext = facebook::nimble::DecodingContextPool::DecodingContext; +using DecodingContextPool = facebook::nimble::DecodingContextPool; + +TEST(DeocodingContextPoolTest, VectorDecoderVisitorMissingThrows) { + try { + auto pool = DecodingContextPool{/* vectorDecoderVisitor*/ nullptr}; + FAIL(); + } catch (facebook::nimble::NimbleUserError& e) { + ASSERT_EQ(e.errorMessage(), "vectorDecoderVisitor must be set"); + } +} + +TEST(DecodingContextPoolTest, ReserveAddPair) { + DecodingContextPool pool; + EXPECT_EQ(pool.size(), 0); + { + // inner scope + auto context = pool.reserveContext(); + EXPECT_EQ(pool.size(), 0); + } + + EXPECT_EQ(pool.size(), 1); +} + +TEST(DecodingContextPoolTest, FillPool) { + DecodingContextPool pool; + EXPECT_EQ(pool.size(), 0); + + { // inner scope + auto context1 = pool.reserveContext(); + auto context2 = pool.reserveContext(); + auto context3 = pool.reserveContext(); + auto context4 = pool.reserveContext(); + EXPECT_EQ(pool.size(), 0); + } + + EXPECT_EQ(pool.size(), 4); +} + +TEST(DecodingContextPoolTest, ParallelFillPool) { + auto parallelismFactor = std::thread::hardware_concurrency(); + auto executor = folly::CPUThreadPoolExecutor{parallelismFactor}; + + DecodingContextPool pool; + EXPECT_EQ(pool.size(), 0); + + for (auto i = 0; i < parallelismFactor; ++i) { + executor.add([&]() { + for (auto j = 0; j < 100000; ++j) { + auto context = pool.reserveContext(); + } + }); + } + executor.join(); + EXPECT_LE(pool.size(), parallelismFactor); +} +} // namespace