diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 5eb2ffb..11b56db 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -21,7 +21,9 @@ #include "dwio/nimble/velox/SchemaTypes.h" #include "velox/common/base/CompareFlags.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" +#include "velox/vector/TypeAliases.h" namespace facebook::nimble { @@ -1037,7 +1039,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { OrderedRanges childFilteredRanges; - auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges); + const velox::ArrayVector* array; + // To unwrap the dictionaryVector we need to cast into ComplexType before + // extracting value arrayVector + const auto dictionaryVector = + vector->as>(); + if (dictionaryVector && + dictionaryVector->valueVector()->template as() && + isDictionaryValidRunLengthEncoded(*dictionaryVector)) { + array = ingestLengthsOffsetsAlreadyEncoded( + *dictionaryVector, ranges, childFilteredRanges); + } else { + array = ingestLengthsOffsets(vector, ranges, childFilteredRanges); + } if (childFilteredRanges.size() > 0) { elements_->write(array->elements(), childFilteredRanges); } @@ -1068,6 +1082,81 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { velox::VectorPtr cachedValue_; velox::vector_size_t cachedSize_; + /* + * Check if the dictionary is valid run length encoded. + * A dictionary is valid if its offsets in order are + * increasing or equal. Two or more offsets are equal + * when the dictionary has been deduped (the values + * vec will be smaller as a result) + * The read side expects offsets to be ordered for caching, + * so we need to ensure that they are ordered if we are going to + * passthrough the dictionary without applying any offset dedup logic. + * Dictionaries of 0 or size 1 are always considered dictionary length + * encoded since there are 0 or 1 offsets to validate. + */ + bool isDictionaryValidRunLengthEncoded( + const velox::DictionaryVector& dictionaryVector) { + const velox::vector_size_t* indices = + dictionaryVector.indices()->template as(); + for (int i = 1; i < dictionaryVector.size(); ++i) { + if (indices[i] < indices[i - 1]) { + return false; + } + } + + return true; + } + + velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded( + const velox::DictionaryVector& dictionaryVector, + const OrderedRanges& ranges, + OrderedRanges& filteredRanges) { + auto size = ranges.size(); + offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size); + + auto& offsetsData = offsetsStream_.mutableData(); + auto& lengthsData = lengthsStream_.mutableData(); + auto& nonNulls = offsetsStream_.mutableNonNulls(); + + const velox::vector_size_t* offsets = + dictionaryVector.indices()->template as(); + auto valuesArrayVector = + dictionaryVector.valueVector()->template as(); + + auto previousOffset = -1; + auto ingestDictionaryIndex = [&](auto index) { + // skip writing length if previous offset was the same + if (previousOffset < 0 || offsetsData.empty() || + offsets[index] != previousOffset) { + auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]); + auto length = valuesArrayVector->sizeAt(offsets[index]); + lengthsData.push_back(length); + if (length > 0) { + filteredRanges.add(arrayOffset, length); + } + ++nextOffset_; + } + + offsetsData.push_back(nextOffset_ - 1); + previousOffset = offsets[index]; + }; + + if (dictionaryVector.mayHaveNulls()) { + ranges.applyEach([&](auto index) { + auto notNull = !dictionaryVector.isNullAt(index); + nonNulls.push_back(notNull); + if (notNull) { + ingestDictionaryIndex(index); + } + }); + } else { + ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); }); + } + // ensure that we mark cache as invalidated + cached_ = false; + return valuesArrayVector; + } + template void ingestLengthsOffsetsByElements( const velox::ArrayVector* array, diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 043e2ca..7322644 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ #include -#include #include #include #include @@ -31,7 +30,9 @@ #include "folly/FileUtil.h" #include "folly/Random.h" #include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/common/base/BitUtil.h" #include "velox/dwio/common/ColumnSelector.h" +#include "velox/type/CppToType.h" #include "velox/type/Type.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" @@ -419,8 +420,19 @@ class VeloxReaderTests : public ::testing::Test { int32_t expectedNumUniqueArrays) { velox::VectorPtr leftResult; velox::VectorPtr rightResult; - ASSERT_TRUE(lhs->next(expectedNumTotalArrays, leftResult)); - ASSERT_TRUE(rhs->next(expectedNumTotalArrays, rightResult)); + // Read all arrays (if there are any) + ASSERT_EQ( + lhs->next(expectedNumTotalArrays, leftResult), + expectedNumTotalArrays > 0); + ASSERT_EQ( + rhs->next(expectedNumTotalArrays, rightResult), + expectedNumTotalArrays > 0); + + // If empty, leftResult & rightResult point to 0 page, + // so we terminate early. + if (expectedNumTotalArrays == 0) { + return; + } ASSERT_EQ( leftResult->wrappedVector() ->as() @@ -439,7 +451,8 @@ class VeloxReaderTests : public ::testing::Test { expectedNumUniqueArrays); for (int i = 0; i < expectedNumTotalArrays; ++i) { - vectorEquals(leftResult, rightResult, i); + ASSERT_TRUE(vectorEquals(leftResult, rightResult, i)) + << "Mismatch at index: " << i; } // Ensure no extra data floating in left/right @@ -807,6 +820,156 @@ class VeloxReaderTests : public ::testing::Test { } } + template + velox::VectorPtr createEncodedDictionaryVectorNullable( + const std::vector>& offsets, + const std::vector>& dictionaryValues) { + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto offsetBuffer = velox::AlignedBuffer::allocate( + offsets.size() /* numValues */, leafPool_.get(), 0 /* initValue */); + auto nullBuffer = velox::AlignedBuffer::allocate( + velox::bits::nwords(offsets.size()), leafPool_.get()); + auto* offsetPtr = offsetBuffer->asMutable(); + auto* nullPtr = nullBuffer->template asMutable(); + bool hasNulls = false; + for (int i = 0; i < offsets.size(); ++i) { + if (offsets[i].has_value()) { + velox::bits::setNull(nullPtr, i, false); + offsetPtr[i] = offsets[i].value(); + } else { + hasNulls = true; + velox::bits::setNull(nullPtr, i, true); + } + } + + auto array = vectorMaker.arrayVector(dictionaryValues); + auto dictionaryArrayEncoded = velox::BaseVector::wrapInDictionary( + hasNulls ? nullBuffer : nullptr, offsetBuffer, offsets.size(), array); + + return dictionaryArrayEncoded; + } + + template + std::vector> + getDictionaryGenerator( + velox::test::VectorMaker& vectorMaker, + const std::vector>& offsets, + const std::vector>& dictionaryValues) { + auto dictionaryArrayGenerator = [&, offsets, dictionaryValues]( + auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + createEncodedDictionaryVectorNullable(offsets, dictionaryValues), + }); + }; + return std::vector< + std::function>( + {dictionaryArrayGenerator}); + } + + template + std::vector> + getArrayGenerator( + velox::test::VectorMaker& vectorMaker, + const std::vector>& arrayValues) { + auto arrayVectorGenerator = [&, arrayValues](auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + vectorMaker.arrayVector(arrayValues), + }); + }; + return std::vector< + std::function>( + {arrayVectorGenerator}); + } + + template + std::vector> + getArrayGeneratorNullable( + velox::test::VectorMaker& vectorMaker, + const std::vector>>>& + arrayValues) { + auto arrayVectorGenerator = [&](auto& /*type*/) { + return vectorMaker.rowVector( + {"dictionaryArray"}, + { + vectorMaker.arrayVectorNullable(arrayValues), + }); + }; + return std::vector< + std::function>( + {arrayVectorGenerator}); + } + + template + void verifyDictionaryEncodedPassthrough( + const std::vector>& offsets, + const std::vector>& dictionaryValues, + const std::vector>& arrayValues) { + auto type = velox::ROW({ + {"dictionaryArray", velox::ARRAY(velox::CppToType::create())}, + }); + auto rowType = std::dynamic_pointer_cast(type); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + auto dictionaryGenerators = + getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); + auto arrayGenerators = getArrayGenerator(vectorMaker, arrayValues); + + auto dictionaryReader = + getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); + auto arrayReader = + getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + + // if dictionaryValues is empty with null offsets, + // our loaded wrapped vector will contain a single null + auto expectedNumUniqueArrays = + dictionaryValues.size() > 0 ? dictionaryValues.size() : 1; + + verifyReadersEqual( + std::move(dictionaryReader), + std::move(arrayReader), + offsets.size(), + expectedNumUniqueArrays); + } + + template + void verifyDictionaryEncodedPassthroughNullable( + const std::vector>& offsets, + const std::vector>& dictionaryValues, + const std::vector>>>& + arrayValues) { + auto type = velox::ROW({ + {"dictionaryArray", velox::ARRAY(velox::CppToType::create())}, + }); + auto rowType = std::dynamic_pointer_cast(type); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + // Test duplicate in first index + auto dictionaryGenerators = + getDictionaryGenerator(vectorMaker, offsets, dictionaryValues); + auto arrayGenerators = getArrayGeneratorNullable(vectorMaker, arrayValues); + + auto dictionaryReader = + getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1); + auto arrayReader = + getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1); + + // if dictionaryValues is empty with null offsets, + // our loaded wrapped vector will contain a single null + auto expectedNumUniqueArrays = + dictionaryValues.size() > 0 ? dictionaryValues.size() : 1; + + verifyReadersEqual( + std::move(dictionaryReader), + std::move(arrayReader), + offsets.size(), /* expectedNumTotalArrays */ + expectedNumUniqueArrays); + } + std::shared_ptr rootPool_; std::shared_ptr leafPool_; }; @@ -1690,6 +1853,118 @@ TEST_F(VeloxReaderTests, ArrayWithOffsetsCaching) { expectedUniqueArrays); } +TEST_F(VeloxReaderTests, DictionaryEncodedPassthrough) { + // test duplicate in first index + auto offsets = std::vector>{0, 0, 1, 2}; + auto dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + auto fullArrayVector = std::vector>{ + {10, 15, 20}, {10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test duplicate in middle index + offsets = std::vector>{0, 1, 1, 2}; + dictionaryValues = + std::vector>{{10, 15}, {30, 40, 50, 60}, {3, 4, 5}}; + fullArrayVector = std::vector>{ + {10, 15}, {30, 40, 50, 60}, {30, 40, 50, 60}, {3, 4, 5}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test duplicate in last index + offsets = std::vector>{0, 1, 2, 2, 2}; + dictionaryValues = std::vector>{ + {10, 15}, {30, 40, 50, 60}, {3, 4, 5, 6, 7}}; + fullArrayVector = std::vector>{ + {10, 15}, + {30, 40, 50, 60}, + {3, 4, 5, 6, 7}, + {3, 4, 5, 6, 7}, + {3, 4, 5, 6, 7}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test all duplicated + offsets = std::vector>{0, 0, 0, 0, 0}; + dictionaryValues = std::vector>{{10, 15, 20}}; + fullArrayVector = std::vector>{ + {10, 15, 20}, {10, 15, 20}, {10, 15, 20}, {10, 15, 20}, {10, 15, 20}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test none duplictaed + offsets = std::vector>{0, 1, 2, 3, 4}; + dictionaryValues = std::vector>{ + {10, 15, 20}, {11, 14, 21}, {12, 13, 22}, {0, 0, 0}, {100, 99, 98}}; + fullArrayVector = std::vector>{ + {10, 15, 20}, {11, 14, 21}, {12, 13, 22}, {0, 0, 0}, {100, 99, 98}}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test all empty + offsets = std::vector>{}; + dictionaryValues = std::vector>{}; + fullArrayVector = std::vector>{}; + verifyDictionaryEncodedPassthrough( + offsets, dictionaryValues, fullArrayVector); + + // test null in first index + offsets = + std::vector>{std::nullopt, 0, 1, 2}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}}; + auto fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50}, + std::vector>{3, 4, 5}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test duplicates split over null in middle + offsets = std::vector>{ + std::nullopt, 0, std::nullopt, 0, 1}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50, 60}}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50, 60}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test duplicates split over multiple nulls in middle + offsets = std::vector>{ + std::nullopt, 0, std::nullopt, std::nullopt, 0, 1}; + dictionaryValues = + std::vector>{{10, 15, 20}, {30, 40, 50, 60}}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, + std::vector>{10, 15, 20}, + std::nullopt, + std::nullopt, + std::vector>{10, 15, 20}, + std::vector>{30, 40, 50, 60}}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); + + // test all null + offsets = std::vector>{ + std::nullopt, std::nullopt, std::nullopt}; + dictionaryValues = std::vector>{}; + fullArrayVectorNullable = + std::vector>>>{ + std::nullopt, std::nullopt, std::nullopt}; + verifyDictionaryEncodedPassthroughNullable( + offsets, dictionaryValues, fullArrayVectorNullable); +} + TEST_F(VeloxReaderTests, FuzzSimple) { auto type = velox::ROW({ {"bool_val", velox::BOOLEAN()},