Skip to content

Commit

Permalink
Allow for dictionary encoded vector ingestion (#85)
Browse files Browse the repository at this point in the history
Summary:

Allows for already encoded dictionaries to passthrough the writer as opposed to being decoded + deduped again.

Specifically, we want to allow for a "passthrough" without decoding when the schema indicates that we are ingesting an ArrayVector, but the array we find is a dictionaryVector with ArrayVector values. When this is the case, we will check to see if the dictionary is a valid run-length encoding (meaning, it likely has been deduped by upstream). The way in which the vector is written to storage does not change, we will still write the offsets and the ArrayVector elements. 

A caveat is that right now, `isDictionaryValidRunLengthEncoded` function doesn't actually verify that there is a duplicate. It's possible that a scenario exists in which a DictionaryVector was passed to the ArrayVector field writer, but the DictionaryVector has not been deduped, but its offsets still technically fulfill a run-length encoding (each offset is greater than or equal to the offset at index - 1).

Reviewed By: helfman

Differential Revision: D59863671
  • Loading branch information
Kunal Kataria authored and facebook-github-bot committed Nov 6, 2024
1 parent aab1f72 commit c3adf45
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 5 deletions.
91 changes: 90 additions & 1 deletion dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::nimble {

Expand Down Expand Up @@ -1037,7 +1039,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
// extracting value arrayVector
const auto dictionaryVector =
vector->as<velox::DictionaryVector<velox::ComplexType>>();
if (dictionaryVector &&
dictionaryVector->valueVector()->template as<velox::ArrayVector>() &&
isDictionaryValidRunLengthEncoded(*dictionaryVector)) {
array = ingestLengthsOffsetsAlreadyEncoded(
*dictionaryVector, ranges, childFilteredRanges);
} else {
array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
}
if (childFilteredRanges.size() > 0) {
elements_->write(array->elements(), childFilteredRanges);
}
Expand Down Expand Up @@ -1068,6 +1082,81 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
velox::VectorPtr cachedValue_;
velox::vector_size_t cachedSize_;

/*
* Check if the dictionary is valid run length encoded.
* A dictionary is valid if its offsets in order are
* increasing or equal. Two or more offsets are equal
* when the dictionary has been deduped (the values
* vec will be smaller as a result)
* The read side expects offsets to be ordered for caching,
* so we need to ensure that they are ordered if we are going to
* passthrough the dictionary without applying any offset dedup logic.
* Dictionaries of 0 or size 1 are always considered dictionary length
* encoded since there are 0 or 1 offsets to validate.
*/
bool isDictionaryValidRunLengthEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector) {
const velox::vector_size_t* indices =
dictionaryVector.indices()->template as<velox::vector_size_t>();
for (int i = 1; i < dictionaryVector.size(); ++i) {
if (indices[i] < indices[i - 1]) {
return false;
}
}

return true;
}

velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector,
const OrderedRanges& ranges,
OrderedRanges& filteredRanges) {
auto size = ranges.size();
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);

auto& offsetsData = offsetsStream_.mutableData();
auto& lengthsData = lengthsStream_.mutableData();
auto& nonNulls = offsetsStream_.mutableNonNulls();

const velox::vector_size_t* offsets =
dictionaryVector.indices()->template as<velox::vector_size_t>();
auto valuesArrayVector =
dictionaryVector.valueVector()->template as<velox::ArrayVector>();

auto previousOffset = -1;
auto ingestDictionaryIndex = [&](auto index) {
// skip writing length if previous offset was the same
if (previousOffset < 0 || offsetsData.empty() ||
offsets[index] != previousOffset) {
auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]);
auto length = valuesArrayVector->sizeAt(offsets[index]);
lengthsData.push_back(length);
if (length > 0) {
filteredRanges.add(arrayOffset, length);
}
++nextOffset_;
}

offsetsData.push_back(nextOffset_ - 1);
previousOffset = offsets[index];
};

if (dictionaryVector.mayHaveNulls()) {
ranges.applyEach([&](auto index) {
auto notNull = !dictionaryVector.isNullAt(index);
nonNulls.push_back(notNull);
if (notNull) {
ingestDictionaryIndex(index);
}
});
} else {
ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); });
}
// ensure that we mark cache as invalidated
cached_ = false;
return valuesArrayVector;
}

template <typename Vector>
void ingestLengthsOffsetsByElements(
const velox::ArrayVector* array,
Expand Down
Loading

0 comments on commit c3adf45

Please sign in to comment.