Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Dec 4, 2024
1 parent d4da823 commit 75c2caa
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 26 deletions.
83 changes: 62 additions & 21 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -284,8 +285,10 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_{context.createNullableContentStreamData<TargetType>(
typeBuilder_->asScalar().scalarDescriptor())} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
auto size = ranges.size();
auto& buffer = context_.stringBuffer();
auto& data = valuesStream_.mutableData();
Expand Down Expand Up @@ -375,8 +378,10 @@ class RowFieldWriter : public FieldWriter {
}
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
auto size = ranges.size();
OrderedRanges childRanges;
const OrderedRanges* childRangesPtr;
Expand Down Expand Up @@ -409,8 +414,19 @@ class RowFieldWriter : public FieldWriter {
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (barrier) {
for (auto i = 0; i < fields_.size(); ++i) {
barrier->add([&field = fields_[i],
&childVector = row->childAt(i),
childRanges = *childRangesPtr]() {
field->write(childVector, childRanges);
});
}
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand Down Expand Up @@ -510,8 +526,10 @@ class ArrayFieldWriter : public MultiValueFieldWriter {
typeBuilder_->asArray().setChildren(elements_->typeBuilder());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
OrderedRanges childRanges;
auto array = ingestLengths<velox::ArrayVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -550,8 +568,10 @@ class MapFieldWriter : public MultiValueFieldWriter {
keys_->typeBuilder(), values_->typeBuilder());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
OrderedRanges childRanges;
auto map = ingestLengths<velox::MapVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -598,8 +618,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
OrderedRanges childFilteredRanges;
auto map = ingestOffsetsAndLengthsDeduplicated(
vector, ranges, childFilteredRanges);
Expand Down Expand Up @@ -731,7 +753,10 @@ class FlatMapPassthroughValueFieldWriter {
: valueField_{std::move(valueField)},
inMapStream_{context.createContentStreamData<bool>(inMapDescriptor)} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) {
auto& data = inMapStream_.mutableData();
data.resize(data.size() + ranges.size(), true);
valueField_->write(vector, ranges);
Expand Down Expand Up @@ -824,11 +849,14 @@ class FlatMapFieldWriter : public FieldWriter {
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges);
isFlatMap ? ingestFlattenedMap(vector, ranges)
: ingestMap(vector, ranges, barrier);
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
Expand Down Expand Up @@ -897,7 +925,10 @@ class FlatMapFieldWriter : public FieldWriter {
}
}

void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
void ingestMap(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) {
NIMBLE_ASSERT(
currentPassthroughFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
Expand Down Expand Up @@ -979,8 +1010,15 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (barrier) {
for (auto& pair : currentValueFields_) {
barrier->add([&]() { pair.second->write(values, nonNullCount); });
}
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -1018,6 +1056,7 @@ class FlatMapFieldWriter : public FieldWriter {

private:
FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) {
std::scoped_lock<std::mutex> lock{context_.flatMapSchemaMutex};
auto it = currentValueFields_.find(key);
if (it != currentValueFields_.end()) {
return it->second;
Expand Down Expand Up @@ -1129,8 +1168,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) override {
OrderedRanges childFilteredRanges;
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
Expand Down
5 changes: 4 additions & 1 deletion dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "dwio/nimble/velox/OrderedRanges.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

Expand Down Expand Up @@ -92,6 +93,7 @@ struct FieldWriterContext {
}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::mutex flatMapSchemaMutex;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down Expand Up @@ -166,7 +168,8 @@ class FieldWriter {
// Writes the vector to internal buffers.
virtual void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) = 0;
const OrderedRanges& ranges,
velox::dwio::common::ExecutorBarrier* barrier = nullptr) = 0;

// Clears interanl state and any accumulated data in internal buffers.
virtual void reset() = 0;
Expand Down
10 changes: 9 additions & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,15 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
NIMBLE_CHECK(file_, "Writer is already closed");
try {
auto size = vector->size();
root_->write(vector, OrderedRanges::of(0, size));
if (context_->options.writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
context_->options.writeExecutor};
auto* barrierPtr = &barrier;
root_->write(vector, OrderedRanges::of(0, size), barrierPtr);
barrier.waitAll();
} else {
root_->write(vector, OrderedRanges::of(0, size));
}

uint64_t memoryUsed = 0;
for (const auto& stream : context_->streams()) {
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
87 changes: 84 additions & 3 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2432,6 +2432,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2517,9 +2519,88 @@ TEST_F(VeloxReaderTests, FuzzComplex) {

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
rng,
*leafPool_.get(),
rowType,
[&](auto& type) { return noNulls.fuzzInputRow(type); },
vectorEquals,
batches,
writerOptions);
writeAndVerify(
rng,
*leafPool_,
rowType,
[&](auto& type) { return hasNulls.fuzzInputRow(type); },
vectorEquals,
batches,
writerOptions);
}
}
}

TEST_F(VeloxReaderTests, TestWriteExecutorDeadLock) {
auto type = velox::ROW(
{{"nested_row",
velox::ROW(
{{"nested_nested_row",
velox::ROW(
{{"nested_nested_nested_row",
velox::ROW({{"a", velox::INTEGER()}})},
{"b", velox::INTEGER()}})}})},
{"map",
velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}});
auto rowType = std::dynamic_pointer_cast<const velox::RowType>(type);
uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed
: folly::Random::rand32();
LOG(INFO) << "seed: " << seed;

nimble::VeloxWriterOptions writerOptions;
// Small batches creates more edge cases.
size_t batchSize = 10;
velox::VectorFuzzer noNulls(
{
.vectorSize = batchSize,
.nullRatio = 0,
.stringLength = 20,
.stringVariableLength = true,
.containerLength = 5,
.containerVariableLength = true,
},
leafPool_.get(),
seed);

velox::VectorFuzzer hasNulls{
{
.vectorSize = batchSize,
.nullRatio = 0.05,
.stringLength = 10,
.stringVariableLength = true,
.containerLength = 5,
.containerVariableLength = true,
},
leafPool_.get(),
seed};

auto iterations = 20;
auto batches = 20;
std::mt19937 rng{seed};

for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
if (parallelismFactor > 0) {
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit 75c2caa

Please sign in to comment.