Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Dec 3, 2024
1 parent 71ea72f commit b7f8c6e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 8 deletions.
34 changes: 30 additions & 4 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -410,8 +411,25 @@ class RowFieldWriter : public FieldWriter {
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (context_.barrier) {
for (auto i = 0; i < fields_.size(); ++i) {
const auto& kind = fields_[i]->typeBuilder()->kind();
if (kind == Kind::FlatMap) {
// if flatmap handle within due to fieldvaluewriter creation
fields_[i]->write(row->childAt(i), *childRangesPtr);
} else {
context_.barrier->add([&field = fields_[i],
&rowItem = row->childAt(i),
childRanges = *childRangesPtr]() {
field->write(rowItem, childRanges);
});
}
}
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand Down Expand Up @@ -980,8 +998,16 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (context_.barrier) {
for (auto& pair : currentValueFields_) {
context_.barrier->add(
[&]() { pair.second->write(values, nonNullCount); });
}
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down
9 changes: 9 additions & 0 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "dwio/nimble/velox/OrderedRanges.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

Expand Down Expand Up @@ -83,6 +84,7 @@ class DecodingContextPool {
struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::shared_ptr<folly::Executor> executor = nullptr,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
: bufferMemoryPool{memoryPool.addLeafChild(
Expand All @@ -92,6 +94,11 @@ struct FieldWriterContext {
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
decodingContextPool_{std::move(vectorDecoderVisitor)} {
if (executor) {
barrier =
std::make_unique<velox::dwio::common::ExecutorBarrier>(executor);
}

resetStringBuffer();
}

Expand All @@ -105,6 +112,8 @@ struct FieldWriterContext {
std::unique_ptr<InputBufferGrowthPolicy> inputBufferGrowthPolicy;
InputBufferGrowthStats inputBufferGrowthStats;

std::unique_ptr<velox::dwio::common::ExecutorBarrier> barrier;

std::function<void(const TypeBuilder&, std::string_view, const TypeBuilder&)>
flatmapFieldAddedEventHandler;

Expand Down
5 changes: 4 additions & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
: FieldWriterContext{memoryPool, options.writeExecutor, options.reclaimerFactory(), options.vectorDecoderVisitor},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down Expand Up @@ -506,6 +506,9 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
try {
auto size = vector->size();
root_->write(vector, OrderedRanges::of(0, size));
if (context_->barrier) {
context_->barrier->waitAll();
}

uint64_t memoryUsed = 0;
for (const auto& stream : context_->streams()) {
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
87 changes: 84 additions & 3 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2519,9 +2521,88 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
rng,
*leafPool_.get(),
rowType,
[&](auto& type) { return noNulls.fuzzInputRow(type); },
vectorEquals,
batches,
writerOptions);
writeAndVerify(
rng,
*leafPool_,
rowType,
[&](auto& type) { return hasNulls.fuzzInputRow(type); },
vectorEquals,
batches,
writerOptions);
}
}
}

TEST_F(VeloxReaderTests, TestWriteExecutorDeadLock) {
auto type = velox::ROW(
{{"nested_row",
velox::ROW(
{{"nested_nested_row",
velox::ROW(
{{"nested_nested_nested_row",
velox::ROW({{"a", velox::INTEGER()}})},
{"b", velox::INTEGER()}})}})},
{"map",
velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}});
auto rowType = std::dynamic_pointer_cast<const velox::RowType>(type);
uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed
: folly::Random::rand32();
LOG(INFO) << "seed: " << seed;

nimble::VeloxWriterOptions writerOptions;
// Small batches creates more edge cases.
size_t batchSize = 10;
velox::VectorFuzzer noNulls(
{
.vectorSize = batchSize,
.nullRatio = 0,
.stringLength = 20,
.stringVariableLength = true,
.containerLength = 5,
.containerVariableLength = true,
},
leafPool_.get(),
seed);

velox::VectorFuzzer hasNulls{
{
.vectorSize = batchSize,
.nullRatio = 0.05,
.stringLength = 10,
.stringVariableLength = true,
.containerLength = 5,
.containerVariableLength = true,
},
leafPool_.get(),
seed};

auto iterations = 20;
auto batches = 20;
std::mt19937 rng{seed};

for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
if (parallelismFactor > 0) {
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit b7f8c6e

Please sign in to comment.