Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 6, 2024
1 parent 5663d9f commit 6046064
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 10 deletions.
56 changes: 49 additions & 7 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "dwio/nimble/velox/DeduplicationUtils.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "folly/concurrency/ConcurrentHashMap.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -368,6 +369,11 @@ class RowFieldWriter : public FieldWriter {
: FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())},
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asRow().nullsDescriptor())} {
if (context.writeExecutor) {
barrier_ =
std::make_unique<ExecutorBarrier>(std::move(context_.writeExecutor));
}

auto rowType =
std::dynamic_pointer_cast<const velox::RowType>(type->type());

Expand Down Expand Up @@ -414,8 +420,27 @@ class RowFieldWriter : public FieldWriter {
[&](auto offset) { childRanges.add(offset, 1); });
context_.decodingPairPool().addPair(std::move(pair));
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (barrier_) {
for (auto i = 0; i < fields_.size(); ++i) {
const auto& kind = fields_[i]->typeBuilder()->kind();
if (kind == Kind::Row || kind == Kind::FlatMap) {
// if row handle now to prevent deadlock
// if flatmap handle within due to fieldvaluewriter creation
fields_[i]->write(row->childAt(i), *childRangesPtr);
} else {
barrier_->add([&field = fields_[i],
&rowItem = row->childAt(i),
&childRanges = *childRangesPtr]() {
field->write(rowItem, childRanges);
});
}
}
barrier_->waitAll();
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand All @@ -434,6 +459,7 @@ class RowFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<ExecutorBarrier> barrier_;
std::vector<std::unique_ptr<FieldWriter>> fields_;
NullsStreamData& nullsStream_;
};
Expand Down Expand Up @@ -800,7 +826,12 @@ class FlatMapFieldWriter : public FieldWriter {
NimbleTypeTraits<K>::scalarKind)),
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}
valueType_{type->childAt(1)} {
if (context.writeExecutor) {
barrier_ =
std::make_unique<ExecutorBarrier>(std::move(context.writeExecutor));
}
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
Expand Down Expand Up @@ -884,8 +915,16 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (barrier_) {
for (auto& pair : currentValueFields_) {
barrier_->add([&]() { pair.second->write(values, nonNullCount); });
}
barrier_->waitAll();
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -914,6 +953,7 @@ class FlatMapFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<ExecutorBarrier> barrier_;
FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) {
auto it = currentValueFields_.find(key);
if (it != currentValueFields_.end()) {
Expand All @@ -926,6 +966,7 @@ class FlatMapFieldWriter : public FieldWriter {

// check whether the typebuilder for this key is already present
auto flatFieldIt = allValueFields_.find(key);

if (flatFieldIt == allValueFields_.end()) {
auto valueFieldWriter = FieldWriter::create(context_, valueType_);
const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild(
Expand All @@ -952,12 +993,13 @@ class FlatMapFieldWriter : public FieldWriter {

NullsStreamData& nullsStream_;
// This map store the FlatMapValue fields used in current flush unit.
folly::F14FastMap<KeyType, FlatMapValueFieldWriter*> currentValueFields_;
folly::ConcurrentHashMap<KeyType, FlatMapValueFieldWriter*>
currentValueFields_;
const std::shared_ptr<const velox::dwio::common::TypeWithId>& valueType_;
uint64_t nonNullCount_ = 0;
// This map store all FlatMapValue fields encountered by the VeloxWriter
// across the whole file.
folly::F14FastMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
folly::ConcurrentHashMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
allValueFields_;
};

Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "folly/concurrency/DynamicBoundedQueue.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

Expand All @@ -35,6 +36,7 @@ struct InputBufferGrowthStats {
std::atomic<uint64_t> itemCount{0};
};

using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
using DecodedVectorPtr = std::unique_ptr<velox::DecodedVector>;
using SelectivityVectorPtr = std::unique_ptr<velox::SelectivityVector>;
using DecodingPair = std::pair<DecodedVectorPtr, SelectivityVectorPtr>;
Expand Down Expand Up @@ -113,13 +115,15 @@ struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::shared_ptr<folly::Executor> executor = nullptr,
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
size_t maxPoolSize = std::thread::hardware_concurrency(),
size_t initialBufferCount = 10)
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
writeExecutor{std::move(executor)},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
bufferPool_{std::make_unique<BufferPool>(
Expand All @@ -131,6 +135,7 @@ struct FieldWriterContext {
std::make_unique<DecodingPairPool>(timeout, maxPoolSize)} {}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::shared_ptr<folly::Executor> writeExecutor;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down
1 change: 1 addition & 0 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class WriterContext : public FieldWriterContext {
: FieldWriterContext{
memoryPool,
options.reclaimerFactory(),
options.writeExecutor,
options.poolTimeout,
options.maxPoolSize,
options.initialBufferCount
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;
};
Expand Down
11 changes: 8 additions & 3 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1740,6 +1740,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -1826,9 +1828,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit 6046064

Please sign in to comment.