Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback functionality to Options/Context to capture visits to getLocalDecodedVector #87

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::nimble {

Expand Down Expand Up @@ -1037,7 +1039,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
// extracting value arrayVector
const auto dictionaryVector =
vector->as<velox::DictionaryVector<velox::ComplexType>>();
if (dictionaryVector &&
dictionaryVector->valueVector()->template as<velox::ArrayVector>() &&
isDictionaryValidRunLengthEncoded(*dictionaryVector)) {
array = ingestLengthsOffsetsAlreadyEncoded(
*dictionaryVector, ranges, childFilteredRanges);
} else {
array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
}
if (childFilteredRanges.size() > 0) {
elements_->write(array->elements(), childFilteredRanges);
}
Expand Down Expand Up @@ -1068,6 +1082,81 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
velox::VectorPtr cachedValue_;
velox::vector_size_t cachedSize_;

/*
* Check if the dictionary is valid run length encoded.
* A dictionary is valid if its offsets in order are
* increasing or equal. Two or more offsets are equal
* when the dictionary has been deduped (the values
* vec will be smaller as a result)
* The read side expects offsets to be ordered for caching,
* so we need to ensure that they are ordered if we are going to
* passthrough the dictionary without applying any offset dedup logic.
* Dictionaries of 0 or size 1 are always considered dictionary length
* encoded since there are 0 or 1 offsets to validate.
*/
bool isDictionaryValidRunLengthEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector) {
const velox::vector_size_t* indices =
dictionaryVector.indices()->template as<velox::vector_size_t>();
for (int i = 1; i < dictionaryVector.size(); ++i) {
if (indices[i] < indices[i - 1]) {
return false;
}
}

return true;
}

velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector,
const OrderedRanges& ranges,
OrderedRanges& filteredRanges) {
auto size = ranges.size();
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);

auto& offsetsData = offsetsStream_.mutableData();
auto& lengthsData = lengthsStream_.mutableData();
auto& nonNulls = offsetsStream_.mutableNonNulls();

const velox::vector_size_t* offsets =
dictionaryVector.indices()->template as<velox::vector_size_t>();
auto valuesArrayVector =
dictionaryVector.valueVector()->template as<velox::ArrayVector>();

auto previousOffset = -1;
auto ingestDictionaryIndex = [&](auto index) {
// skip writing length if previous offset was the same
if (previousOffset < 0 || offsetsData.empty() ||
offsets[index] != previousOffset) {
auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]);
auto length = valuesArrayVector->sizeAt(offsets[index]);
lengthsData.push_back(length);
if (length > 0) {
filteredRanges.add(arrayOffset, length);
}
++nextOffset_;
}

offsetsData.push_back(nextOffset_ - 1);
previousOffset = offsets[index];
};

if (dictionaryVector.mayHaveNulls()) {
ranges.applyEach([&](auto index) {
auto notNull = !dictionaryVector.isNullAt(index);
nonNulls.push_back(notNull);
if (notNull) {
ingestDictionaryIndex(index);
}
});
} else {
ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); });
}
// ensure that we mark cache as invalidated
cached_ = false;
return valuesArrayVector;
}

template <typename Vector>
void ingestLengthsOffsetsByElements(
const velox::ArrayVector* array,
Expand Down Expand Up @@ -1280,6 +1369,8 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
}

Expand Down
8 changes: 6 additions & 2 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ struct FieldWriterContext {

explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr)
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()} {
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
vectorDecoderVisitor(std::move(vectorDecoderVisitor)) {
resetStringBuffer();
}

Expand All @@ -65,6 +67,8 @@ struct FieldWriterContext {
std::function<void(const TypeBuilder&)> typeAddedHandler =
[](const TypeBuilder&) {};

std::function<void(void)> vectorDecoderVisitor;

LocalDecodedVector getLocalDecodedVector();
velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size);

Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory()},
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ struct VeloxWriterOptions {
std::shared_ptr<folly::Executor> encodingExecutor;

bool enableChunking = false;

// This callback will be visited on access to getDecodedVector in order to
// monitor usage of decoded vectors vs. data that is passed-through in the
// writer. Default function is no-op since its used for tests only.
std::function<void(void)> vectorDecoderVisitor = []() {};
};

} // namespace facebook::nimble
Loading