Skip to content

Commit

Permalink
Add callback functionality to Options/Context to capture visits to ge…
Browse files Browse the repository at this point in the history
…tLocalDecodedVector (#87)

Summary:

As title, in order to verify passthrough code is being correctly called and the local decoded vector is NOT being accessed, we need a way to track visits to the decode function. On suggestion/idea by helfman, we've created a visit callback passed in via the writer options which allows us to track if the decode visit has been called from the writer code. 

I did verify that the callback is being called through some tests that I know _do_ use the decode functionality. I plan to add decode visit counters for those tests later but  that is out of scope for this diff for now

This decode visit functionality will be used in the flatmap passthrough functionality as well

Reviewed By: helfman

Differential Revision: D60241883
  • Loading branch information
Kunal Kataria authored and facebook-github-bot committed Oct 30, 2024
1 parent 74c827b commit a254e78
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 14 deletions.
2 changes: 2 additions & 0 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,8 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
}

Expand Down
8 changes: 6 additions & 2 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ struct FieldWriterContext {

explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr)
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()} {
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
vectorDecoderVisitor(std::move(vectorDecoderVisitor)) {
resetStringBuffer();
}

Expand All @@ -65,6 +67,8 @@ struct FieldWriterContext {
std::function<void(const TypeBuilder&)> typeAddedHandler =
[](const TypeBuilder&) {};

std::function<void(void)> vectorDecoderVisitor;

LocalDecodedVector getLocalDecodedVector();
velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size);

Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory()},
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ struct VeloxWriterOptions {
std::shared_ptr<folly::Executor> encodingExecutor;

bool enableChunking = false;

// This callback will be visited on access to getDecodedVector in order to
// monitor usage of decoded vectors vs. data that is passed-through in the
// writer. Default function is no-op since its used for tests only.
std::function<void(void)> vectorDecoderVisitor = []() {};
};

} // namespace facebook::nimble
88 changes: 77 additions & 11 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,19 @@ class VeloxReaderTests : public ::testing::Test {
size_t batchSize,
bool multiSkip = false,
bool checkMemoryLeak = false) {
nimble::VeloxWriterOptions writerOptions = {};
nimble::VeloxReadParams readParams = {};
int decodeCounterUnused = 0;
return getReaderForWriteWithDecodeCounter(
pool, type, generators, decodeCounterUnused);
}

std::unique_ptr<nimble::VeloxReader> getReaderForWriteWithDecodeCounter(
velox::memory::MemoryPool& pool,
const velox::RowTypePtr& type,
std::vector<std::function<velox::VectorPtr(const velox::RowTypePtr&)>>
generators,
int& decodeCounter) {
nimble::VeloxWriterOptions writerOptions;
nimble::VeloxReadParams readParams;

std::string file;
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
Expand All @@ -739,6 +750,9 @@ class VeloxReaderTests : public ::testing::Test {
[&](auto&) { return nimble::FlushDecision::None; });
};
writerOptions.dictionaryArrayColumns.insert("dictionaryArray");
writerOptions.vectorDecoderVisitor = [&decodeCounter]() {
++decodeCounter;
};

nimble::VeloxWriter writer(
*rootPool_, type, std::move(writeFile), std::move(writerOptions));
Expand All @@ -755,7 +769,10 @@ class VeloxReaderTests : public ::testing::Test {
auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);

return std::make_unique<nimble::VeloxReader>(
*leafPool_, std::move(readFile), std::move(selector), readParams);
*leafPool_,
std::move(readFile),
std::move(selector),
std::move(readParams));
}

void verifySlidingWindowMap(
Expand Down Expand Up @@ -820,6 +837,14 @@ class VeloxReaderTests : public ::testing::Test {
}
}

velox::VectorPtr wrapInDictionarySingleIndex(
velox::VectorPtr vectorToEncode) {
auto offsetBuffer = velox::AlignedBuffer::allocate<velox::vector_size_t>(
1 /* numValues */, leafPool_.get(), 0 /* initValue */);
return velox::BaseVector::wrapInDictionary(
nullptr, offsetBuffer, 1, vectorToEncode);
}

template <typename T>
velox::VectorPtr createEncodedDictionaryVectorNullable(
const std::vector<std::optional<velox::vector_size_t>>& offsets,
Expand Down Expand Up @@ -919,10 +944,15 @@ class VeloxReaderTests : public ::testing::Test {
getDictionaryGenerator(vectorMaker, offsets, dictionaryValues);
auto arrayGenerators = getArrayGenerator(vectorMaker, arrayValues);

auto dictionaryReader =
getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1);
auto arrayReader =
getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1);
int decodeDictionaryCount = 0;
int decodeArrayCount = 0;
auto dictionaryReader = getReaderForWriteWithDecodeCounter(
*leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount);
auto arrayReader = getReaderForWriteWithDecodeCounter(
*leafPool_, rowType, arrayGenerators, decodeArrayCount);

ASSERT_EQ(decodeDictionaryCount, 0);
ASSERT_EQ(decodeArrayCount, 0);

// if dictionaryValues is empty with null offsets,
// our loaded wrapped vector will contain a single null
Expand Down Expand Up @@ -953,10 +983,15 @@ class VeloxReaderTests : public ::testing::Test {
getDictionaryGenerator(vectorMaker, offsets, dictionaryValues);
auto arrayGenerators = getArrayGeneratorNullable(vectorMaker, arrayValues);

auto dictionaryReader =
getReaderForWrite(*leafPool_, rowType, dictionaryGenerators, 1);
auto arrayReader =
getReaderForWrite(*leafPool_, rowType, arrayGenerators, 1);
int decodeDictionaryCount = 0;
int decodeArrayCount = 0;
auto dictionaryReader = getReaderForWriteWithDecodeCounter(
*leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount);
auto arrayReader = getReaderForWriteWithDecodeCounter(
*leafPool_, rowType, arrayGenerators, decodeArrayCount);

ASSERT_EQ(decodeDictionaryCount, 0);
ASSERT_EQ(decodeArrayCount, 0);

// if dictionaryValues is empty with null offsets,
// our loaded wrapped vector will contain a single null
Expand Down Expand Up @@ -1965,6 +2000,37 @@ TEST_F(VeloxReaderTests, DictionaryEncodedPassthrough) {
offsets, dictionaryValues, fullArrayVectorNullable);
}

TEST_F(VeloxReaderTests, DictionaryEncodedPassthroughDecoding) {
auto offsets = std::vector<std::optional<velox::vector_size_t>>{0, 0, 1, 2};
auto dictionaryValues =
std::vector<std::vector<int32_t>>{{10, 15, 20}, {30, 40, 50}, {3, 4, 5}};
auto type = velox::ROW({
{"dictionaryArray", velox::ARRAY(velox::CppToType<int32_t>::create())},
});
auto rowType = std::dynamic_pointer_cast<const velox::RowType>(type);

velox::test::VectorMaker vectorMaker{leafPool_.get()};

auto dictionaryGeneratorWrapped = [&](auto& /*type*/) {
return vectorMaker.rowVector(
{"dictionaryArray"},
{
wrapInDictionarySingleIndex(createEncodedDictionaryVectorNullable(
offsets, dictionaryValues)),
});
};

auto dictionaryGenerators =
std::vector<std::function<velox::VectorPtr(const velox::RowTypePtr&)>>{
dictionaryGeneratorWrapped};

int decodeDictionaryCount = 0;
auto dictionaryReader = getReaderForWriteWithDecodeCounter(
*leafPool_, rowType, dictionaryGenerators, decodeDictionaryCount);

ASSERT_EQ(decodeDictionaryCount, 1);
}

TEST_F(VeloxReaderTests, FuzzSimple) {
auto type = velox::ROW({
{"bool_val", velox::BOOLEAN()},
Expand Down

0 comments on commit a254e78

Please sign in to comment.