From 40a3334c2698b942b21fedfb167a1d987ad950e0 Mon Sep 17 00:00:00 2001 From: Kunal Kataria Date: Thu, 10 Oct 2024 08:09:57 -0700 Subject: [PATCH] Add callback functionality to Options/Context to capture visits to getLocalDecodedVector (#87) Summary: As title, in order to verify passthrough code is being correctly called and the local decoded vector is NOT being accessed, we need a way to track visits to the decode function. On suggestion/idea by helfman, we've created a visit callback passed in via the writer options which allows us to track if the decode visit has been called from the writer code. I did verify that the callback is being called through some tests that I know _do_ use the decode functionality. I plan to add decode visit counters for those tests later but that is out of scope for this diff for now This decode visit functionality will be used in the flatmap passthrough functionality as well Reviewed By: helfman Differential Revision: D60241883 --- dwio/nimble/velox/FieldWriter.cpp | 2 + dwio/nimble/velox/FieldWriter.h | 8 +- dwio/nimble/velox/VeloxWriter.cpp | 2 +- dwio/nimble/velox/VeloxWriterOptions.h | 5 ++ dwio/nimble/velox/tests/VeloxReaderTests.cpp | 88 +++++++++++++++++--- 5 files changed, 91 insertions(+), 14 deletions(-) diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 11b56db..242b697 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -1369,6 +1369,8 @@ std::unique_ptr createArrayWithOffsetsFieldWriter( FieldWriterContext::LocalDecodedVector FieldWriterContext::getLocalDecodedVector() { + NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing"); + vectorDecoderVisitor(); return LocalDecodedVector{*this}; } diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 0efa941..a5c1405 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -39,13 +39,15 @@ struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, - std::unique_ptr reclaimer = nullptr) + std::unique_ptr reclaimer = nullptr, + std::function vectorDecoderVisitor = []() {}) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, inputBufferGrowthPolicy{ - DefaultInputBufferGrowthPolicy::withDefaultRanges()} { + DefaultInputBufferGrowthPolicy::withDefaultRanges()}, + vectorDecoderVisitor(std::move(vectorDecoderVisitor)) { resetStringBuffer(); } @@ -65,6 +67,8 @@ struct FieldWriterContext { std::function typeAddedHandler = [](const TypeBuilder&) {}; + std::function vectorDecoderVisitor; + LocalDecodedVector getLocalDecodedVector(); velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size); diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 9980969..0c5ea32 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory()}, + : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 4689b1d..8e4386a 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -131,6 +131,11 @@ struct VeloxWriterOptions { std::shared_ptr encodingExecutor; bool enableChunking = false; + + // This callback will be visited on access to getDecodedVector in order to + // monitor usage of decoded vectors vs. data that is passed-through in the + // writer. Default function is no-op since its used for tests only. + std::function vectorDecoderVisitor = []() {}; }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index b1d552b..d0bed37 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -728,8 +728,19 @@ class VeloxReaderTests : public ::testing::Test { size_t batchSize, bool multiSkip = false, bool checkMemoryLeak = false) { - nimble::VeloxWriterOptions writerOptions = {}; - nimble::VeloxReadParams readParams = {}; + int decodeCounterUnused = 0; + return getReaderForWriteWithDecodeCounter( + pool, type, generators, decodeCounterUnused); + } + + std::unique_ptr getReaderForWriteWithDecodeCounter( + velox::memory::MemoryPool& pool, + const velox::RowTypePtr& type, + std::vector> + generators, + int& decodeCounter) { + nimble::VeloxWriterOptions writerOptions; + nimble::VeloxReadParams readParams; std::string file; auto writeFile = std::make_unique(&file); @@ -739,6 +750,9 @@ class VeloxReaderTests : public ::testing::Test { [&](auto&) { return nimble::FlushDecision::None; }); }; writerOptions.dictionaryArrayColumns.insert("dictionaryArray"); + writerOptions.vectorDecoderVisitor = [&decodeCounter]() { + ++decodeCounter; + }; nimble::VeloxWriter writer( *rootPool_, type, std::move(writeFile), std::move(writerOptions)); @@ -755,7 +769,10 @@ class VeloxReaderTests : public ::testing::Test { auto selector = std::make_shared(type); return std::make_unique( - *leafPool_, std::move(readFile), std::move(selector), readParams); + *leafPool_, + std::move(readFile), + std::move(selector), + std::move(readParams)); } void verifySlidingWindowMap( @@ -820,6 +837,14 @@ class VeloxReaderTests : public ::testing::Test { } } + velox::VectorPtr wrapInDictionarySingleIndex( + velox::VectorPtr vectorToEncode) { + auto offsetBuffer = velox::AlignedBuffer::allocate( + 1 /* numValues */, leafPool_.get(), 0 /* initValue */); + return velox::BaseVector::wrapInDictionary( + nullptr, offsetBuffer, 1, vectorToEncode); + } + template velox::VectorPtr createEncodedDictionaryVectorNullable( const std::vector>& offsets, @@ -919,10 +944,15 @@ class VeloxReaderTests : public ::testing::Test { getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); auto arrayGenerators = getArrayGenerator(vectorMaker, arrayValues); - auto dictionaryReader = - getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); - auto arrayReader = - getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + int decodeDictionaryCount = 0; + int decodeArrayCount = 0; + auto dictionaryReader = getReaderForWriteWithDecodeCounter( + *leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount); + auto arrayReader = getReaderForWriteWithDecodeCounter( + *leafPool_, rowType, arrayGenerators, decodeArrayCount); + + ASSERT_EQ(decodeDictionaryCount, 0); + ASSERT_EQ(decodeArrayCount, 0); // if dictionaryValues is empty with null offsets, // our loaded wrapped vector will contain a single null @@ -953,10 +983,15 @@ class VeloxReaderTests : public ::testing::Test { getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); auto arrayGenerators = getArrayGeneratorNullable(vectorMaker, arrayValues); - auto dictionaryReader = - getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); - auto arrayReader = - getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + int decodeDictionaryCount = 0; + int decodeArrayCount = 0; + auto dictionaryReader = getReaderForWriteWithDecodeCounter( + *leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount); + auto arrayReader = getReaderForWriteWithDecodeCounter( + *leafPool_, rowType, arrayGenerators, decodeArrayCount); + + ASSERT_EQ(decodeDictionaryCount, 0); + ASSERT_EQ(decodeArrayCount, 0); // if dictionaryValues is empty with null offsets, // our loaded wrapped vector will contain a single null @@ -1965,6 +2000,37 @@ TEST_F(VeloxReaderTests, DictionaryEncodedPassthrough) { offsets, dictionaryValues, fullArrayVectorNullable); } +TEST_F(VeloxReaderTests, DictionaryEncodedPassthroughDecoding) { + auto offsets = std::vector>{0, 0, 1, 2}; + auto dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + auto type = velox::ROW({ + {"dictionaryArray", velox::ARRAY(velox::CppToType::create())}, + }); + auto rowType = std::dynamic_pointer_cast(type); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + auto dictionaryGeneratorWrapped = [&](auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + wrapInDictionarySingleIndex(createEncodedDictionaryVectorNullable( + offsets, dictionaryValues)), + }); + }; + + auto dictionaryGenerators = + std::vector>{ + dictionaryGeneratorWrapped}; + + int decodeDictionaryCount = 0; + auto dictionaryReader = getReaderForWriteWithDecodeCounter( + *leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount); + + ASSERT_EQ(decodeDictionaryCount, 1); +} + TEST_F(VeloxReaderTests, FuzzSimple) { auto type = velox::ROW({ {"bool_val", velox::BOOLEAN()},