From f9f9caf9474adad2aaeee91c13405e95f4da37de Mon Sep 17 00:00:00 2001 From: Kunal Kataria Date: Thu, 10 Oct 2024 08:09:57 -0700 Subject: [PATCH] Allow for dictionary encoded vector ingestion (#85) Summary: Allows for already encoded dictionaries to passthrough the writer as opposed to being decoded + deduped again. Specifically, we want to allow for a "passthrough" without decoding when the schema indicates that we are ingesting an ArrayVector, but the array we find is a dictionaryVector with ArrayVector values. When this is the case, we will check to see if the dictionary is a valid run-length encoding (meaning, it likely has been deduped by upstream). The way in which the vector is written to storage does not change, we will still write the offsets and the ArrayVector elements. A caveat is that right now, `isDictionaryValidRunLengthEncoded` function doesn't actually verify that there is a duplicate. It's possible that a scenario exists in which a DictionaryVector was passed to the ArrayVector field writer, but the DictionaryVector has not been deduped, but its offsets still technically fulfill a run-length encoding (each offset is greater than or equal to the offset at index - 1). Reviewed By: helfman Differential Revision: D59863671 --- dwio/nimble/velox/FieldWriter.cpp | 91 +++++- dwio/nimble/velox/tests/VeloxReaderTests.cpp | 283 ++++++++++++++++++- 2 files changed, 369 insertions(+), 5 deletions(-) diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 5eb2ffb..11b56db 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -21,7 +21,9 @@ #include "dwio/nimble/velox/SchemaTypes.h" #include "velox/common/base/CompareFlags.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" +#include "velox/vector/TypeAliases.h" namespace facebook::nimble { @@ -1037,7 +1039,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { OrderedRanges childFilteredRanges; - auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges); + const velox::ArrayVector* array; + // To unwrap the dictionaryVector we need to cast into ComplexType before + // extracting value arrayVector + const auto dictionaryVector = + vector->as>(); + if (dictionaryVector && + dictionaryVector->valueVector()->template as() && + isDictionaryValidRunLengthEncoded(*dictionaryVector)) { + array = ingestLengthsOffsetsAlreadyEncoded( + *dictionaryVector, ranges, childFilteredRanges); + } else { + array = ingestLengthsOffsets(vector, ranges, childFilteredRanges); + } if (childFilteredRanges.size() > 0) { elements_->write(array->elements(), childFilteredRanges); } @@ -1068,6 +1082,81 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { velox::VectorPtr cachedValue_; velox::vector_size_t cachedSize_; + /* + * Check if the dictionary is valid run length encoded. + * A dictionary is valid if its offsets in order are + * increasing or equal. Two or more offsets are equal + * when the dictionary has been deduped (the values + * vec will be smaller as a result) + * The read side expects offsets to be ordered for caching, + * so we need to ensure that they are ordered if we are going to + * passthrough the dictionary without applying any offset dedup logic. + * Dictionaries of 0 or size 1 are always considered dictionary length + * encoded since there are 0 or 1 offsets to validate. + */ + bool isDictionaryValidRunLengthEncoded( + const velox::DictionaryVector& dictionaryVector) { + const velox::vector_size_t* indices = + dictionaryVector.indices()->template as(); + for (int i = 1; i < dictionaryVector.size(); ++i) { + if (indices[i] < indices[i - 1]) { + return false; + } + } + + return true; + } + + velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded( + const velox::DictionaryVector& dictionaryVector, + const OrderedRanges& ranges, + OrderedRanges& filteredRanges) { + auto size = ranges.size(); + offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size); + + auto& offsetsData = offsetsStream_.mutableData(); + auto& lengthsData = lengthsStream_.mutableData(); + auto& nonNulls = offsetsStream_.mutableNonNulls(); + + const velox::vector_size_t* offsets = + dictionaryVector.indices()->template as(); + auto valuesArrayVector = + dictionaryVector.valueVector()->template as(); + + auto previousOffset = -1; + auto ingestDictionaryIndex = [&](auto index) { + // skip writing length if previous offset was the same + if (previousOffset < 0 || offsetsData.empty() || + offsets[index] != previousOffset) { + auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]); + auto length = valuesArrayVector->sizeAt(offsets[index]); + lengthsData.push_back(length); + if (length > 0) { + filteredRanges.add(arrayOffset, length); + } + ++nextOffset_; + } + + offsetsData.push_back(nextOffset_ - 1); + previousOffset = offsets[index]; + }; + + if (dictionaryVector.mayHaveNulls()) { + ranges.applyEach([&](auto index) { + auto notNull = !dictionaryVector.isNullAt(index); + nonNulls.push_back(notNull); + if (notNull) { + ingestDictionaryIndex(index); + } + }); + } else { + ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); }); + } + // ensure that we mark cache as invalidated + cached_ = false; + return valuesArrayVector; + } + template void ingestLengthsOffsetsByElements( const velox::ArrayVector* array, diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index aee17ce..b1d552b 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ #include -#include #include #include #include @@ -31,7 +30,9 @@ #include "folly/FileUtil.h" #include "folly/Random.h" #include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/common/base/BitUtil.h" #include "velox/dwio/common/ColumnSelector.h" +#include "velox/type/CppToType.h" #include "velox/type/Type.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" @@ -419,8 +420,19 @@ class VeloxReaderTests : public ::testing::Test { int32_t expectedNumUniqueArrays) { velox::VectorPtr leftResult; velox::VectorPtr rightResult; - ASSERT_TRUE(lhs->next(expectedNumTotalArrays, leftResult)); - ASSERT_TRUE(rhs->next(expectedNumTotalArrays, rightResult)); + // Read all arrays (if there are any) + ASSERT_EQ( + lhs->next(expectedNumTotalArrays, leftResult), + expectedNumTotalArrays > 0); + ASSERT_EQ( + rhs->next(expectedNumTotalArrays, rightResult), + expectedNumTotalArrays > 0); + + // If empty, leftResult & rightResult point to 0 page, + // so we terminate early. + if (expectedNumTotalArrays == 0) { + return; + } ASSERT_EQ( leftResult->wrappedVector() ->as() @@ -439,7 +451,8 @@ class VeloxReaderTests : public ::testing::Test { expectedNumUniqueArrays); for (int i = 0; i < expectedNumTotalArrays; ++i) { - vectorEquals(leftResult, rightResult, i); + ASSERT_TRUE(vectorEquals(leftResult, rightResult, i)) + << "Mismatch at index: " << i; } // Ensure no extra data floating in left/right @@ -807,6 +820,156 @@ class VeloxReaderTests : public ::testing::Test { } } + template + velox::VectorPtr createEncodedDictionaryVectorNullable( + const std::vector>& offsets, + const std::vector>& dictionaryValues) { + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto offsetBuffer = velox::AlignedBuffer::allocate( + offsets.size() /* numValues */, leafPool_.get(), 0 /* initValue */); + auto nullBuffer = velox::AlignedBuffer::allocate( + velox::bits::nwords(offsets.size()), leafPool_.get()); + auto* offsetPtr = offsetBuffer->asMutable(); + auto* nullPtr = nullBuffer->template asMutable(); + bool hasNulls = false; + for (int i = 0; i < offsets.size(); ++i) { + if (offsets[i].has_value()) { + velox::bits::setNull(nullPtr, i, false); + offsetPtr[i] = offsets[i].value(); + } else { + hasNulls = true; + velox::bits::setNull(nullPtr, i, true); + } + } + + auto array = vectorMaker.arrayVector(dictionaryValues); + auto dictionaryArrayEncoded = velox::BaseVector::wrapInDictionary( + hasNulls ? nullBuffer : nullptr, offsetBuffer, offsets.size(), array); + + return dictionaryArrayEncoded; + } + + template + std::vector> + getDictionaryGenerator( + velox::test::VectorMaker& vectorMaker, + const std::vector>& offsets, + const std::vector>& dictionaryValues) { + auto dictionaryArrayGenerator = [&, offsets, dictionaryValues]( + auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + createEncodedDictionaryVectorNullable(offsets, dictionaryValues), + }); + }; + return std::vector< + std::function>( + {dictionaryArrayGenerator}); + } + + template + std::vector> + getArrayGenerator( + velox::test::VectorMaker& vectorMaker, + const std::vector>& arrayValues) { + auto arrayVectorGenerator = [&, arrayValues](auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + vectorMaker.arrayVector(arrayValues), + }); + }; + return std::vector< + std::function>( + {arrayVectorGenerator}); + } + + template + std::vector> + getArrayGeneratorNullable( + velox::test::VectorMaker& vectorMaker, + const std::vector>>>& + arrayValues) { + auto arrayVectorGenerator = [&](auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + vectorMaker.arrayVectorNullable(arrayValues), + }); + }; + return std::vector< + std::function>( + {arrayVectorGenerator}); + } + + template + void verifyDictionaryEncodedPassthrough( + const std::vector>& offsets, + const std::vector>& dictionaryValues, + const std::vector>& arrayValues) { + auto type = velox::ROW({ + {"dictionaryArray", velox::ARRAY(velox::CppToType::create())}, + }); + auto rowType = std::dynamic_pointer_cast(type); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + auto dictionaryGenerators = + getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); + auto arrayGenerators = getArrayGenerator(vectorMaker, arrayValues); + + auto dictionaryReader = + getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); + auto arrayReader = + getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + + // if dictionaryValues is empty with null offsets, + // our loaded wrapped vector will contain a single null + auto expectedNumUniqueArrays = + dictionaryValues.size() > 0 ? dictionaryValues.size() : 1; + + verifyReadersEqual( + std::move(dictionaryReader), + std::move(arrayReader), + offsets.size(), + expectedNumUniqueArrays); + } + + template + void verifyDictionaryEncodedPassthroughNullable( + const std::vector>& offsets, + const std::vector>& dictionaryValues, + const std::vector>>>& + arrayValues) { + auto type = velox::ROW({ + {"dictionaryArray", velox::ARRAY(velox::CppToType::create())}, + }); + auto rowType = std::dynamic_pointer_cast(type); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + // Test duplicate in first index + auto dictionaryGenerators = + getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); + auto arrayGenerators = getArrayGeneratorNullable(vectorMaker, arrayValues); + + auto dictionaryReader = + getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); + auto arrayReader = + getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + + // if dictionaryValues is empty with null offsets, + // our loaded wrapped vector will contain a single null + auto expectedNumUniqueArrays = + dictionaryValues.size() > 0 ? dictionaryValues.size() : 1; + + verifyReadersEqual( + std::move(dictionaryReader), + std::move(arrayReader), + offsets.size(), /* expectedNumTotalArrays */ + expectedNumUniqueArrays); + } + std::shared_ptr rootPool_; std::shared_ptr leafPool_; }; @@ -1690,6 +1853,118 @@ TEST_F(VeloxReaderTests, ArrayWithOffsetsCaching) { expectedUniqueArrays); } +TEST_F(VeloxReaderTests, DictionaryEncodedPassthrough) { + // test duplicate in first index + auto offsets = std::vector>{0, 0, 1, 2}; + auto dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + auto fullArrayVector = std::vector>{ + {10, 15, 20}, {10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test duplicate in middle index + offsets = std::vector>{0, 1, 1, 2}; + dictionaryValues = + std::vector>{{10, 15}, {30, 40, 50, 60}, {3, 4, 5}}; + fullArrayVector = std::vector>{ + {10, 15}, {30, 40, 50, 60}, {30, 40, 50, 60}, {3, 4, 5}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test duplicate in last index + offsets = std::vector>{0, 1, 2, 2, 2}; + dictionaryValues = std::vector>{ + {10, 15}, {30, 40, 50, 60}, {3, 4, 5, 6, 7}}; + fullArrayVector = std::vector>{ + {10, 15}, + {30, 40, 50, 60}, + {3, 4, 5, 6, 7}, + {3, 4, 5, 6, 7}, + {3, 4, 5, 6, 7}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test all duplicated + offsets = std::vector>{0, 0, 0, 0, 0}; + dictionaryValues = std::vector>{{10, 15, 20}}; + fullArrayVector = std::vector>{ + {10, 15, 20}, {10, 15, 20}, {10, 15, 20}, {10, 15, 20}, {10, 15, 20}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test none duplictaed + offsets = std::vector>{0, 1, 2, 3, 4}; + dictionaryValues = std::vector>{ + {10, 15, 20}, {11, 14, 21}, {12, 13, 22}, {0, 0, 0}, {100, 99, 98}}; + fullArrayVector = std::vector>{ + {10, 15, 20}, {11, 14, 21}, {12, 13, 22}, {0, 0, 0}, {100, 99, 98}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test all empty + offsets = std::vector>{}; + dictionaryValues = std::vector>{}; + fullArrayVector = std::vector>{}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test null in first index + offsets = + std::vector>{std::nullopt, 0, 1, 2}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + auto fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50}, + std::vector>{3, 4, 5}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test duplicates split over null in middle + offsets = std::vector>{ + std::nullopt, 0, std::nullopt, 0, 1}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50, 60}}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50, 60}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test duplicates split over multiple nulls in middle + offsets = std::vector>{ + std::nullopt, 0, std::nullopt, std::nullopt, 0, 1}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50, 60}}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::nullopt, + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50, 60}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test all null + offsets = std::vector>{ + std::nullopt, std::nullopt, std::nullopt}; + dictionaryValues = std::vector>{}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, std::nullopt, std::nullopt}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); +} + TEST_F(VeloxReaderTests, FuzzSimple) { auto type = velox::ROW({ {"bool_val", velox::BOOLEAN()},