Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Reviewed By: helfman

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Dec 4, 2024
1 parent 91b96b3 commit a33242d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 27 deletions.
78 changes: 58 additions & 20 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_{context.createNullableContentStreamData<TargetType>(
typeBuilder_->asScalar().scalarDescriptor())} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
auto size = ranges.size();
auto& buffer = context_.stringBuffer();
auto& data = valuesStream_.mutableData();
Expand Down Expand Up @@ -375,8 +377,10 @@ class RowFieldWriter : public FieldWriter {
}
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
auto size = ranges.size();
OrderedRanges childRanges;
const OrderedRanges* childRangesPtr;
Expand Down Expand Up @@ -409,8 +413,19 @@ class RowFieldWriter : public FieldWriter {
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (executor) {
for (auto i = 0; i < fields_.size(); ++i) {
executor->add([&field = fields_[i],
&childVector = row->childAt(i),
childRanges = *childRangesPtr]() {
field->write(childVector, childRanges);
});
}
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand Down Expand Up @@ -510,8 +525,10 @@ class ArrayFieldWriter : public MultiValueFieldWriter {
typeBuilder_->asArray().setChildren(elements_->typeBuilder());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor) override {
OrderedRanges childRanges;
auto array = ingestLengths<velox::ArrayVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -550,8 +567,10 @@ class MapFieldWriter : public MultiValueFieldWriter {
keys_->typeBuilder(), values_->typeBuilder());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
OrderedRanges childRanges;
auto map = ingestLengths<velox::MapVector>(vector, ranges, childRanges);
if (childRanges.size() > 0) {
Expand Down Expand Up @@ -598,8 +617,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
OrderedRanges childFilteredRanges;
auto map = ingestOffsetsAndLengthsDeduplicated(
vector, ranges, childFilteredRanges);
Expand Down Expand Up @@ -824,11 +845,14 @@ class FlatMapFieldWriter : public FieldWriter {
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges);
isFlatMap ? ingestFlattenedMap(vector, ranges)
: ingestMap(vector, ranges, executor);
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
Expand Down Expand Up @@ -897,7 +921,10 @@ class FlatMapFieldWriter : public FieldWriter {
}
}

void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
void ingestMap(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) {
NIMBLE_ASSERT(
currentPassthroughFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
Expand Down Expand Up @@ -979,8 +1006,15 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (executor) {
for (auto& pair : currentValueFields_) {
executor->add([&]() { pair.second->write(values, nonNullCount); });
}
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -1030,6 +1064,8 @@ class FlatMapFieldWriter : public FieldWriter {
// check whether the typebuilder for this key is already present
auto flatFieldIt = allValueFields_.find(key);
if (flatFieldIt == allValueFields_.end()) {
std::scoped_lock<std::mutex> lock{context_.flatMapSchemaMutex};

auto valueFieldWriter = FieldWriter::create(context_, valueType_);
const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild(
stringKey, valueFieldWriter->typeBuilder());
Expand Down Expand Up @@ -1129,8 +1165,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
type->type(), 1, context.bufferMemoryPool.get());
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges,
folly::Executor*) override {
OrderedRanges childFilteredRanges;
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
Expand Down
4 changes: 3 additions & 1 deletion dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct FieldWriterContext {
}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::mutex flatMapSchemaMutex;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down Expand Up @@ -166,7 +167,8 @@ class FieldWriter {
// Writes the vector to internal buffers.
virtual void write(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) = 0;
const OrderedRanges& ranges,
folly::Executor* executor = nullptr) = 0;

// Clears interanl state and any accumulated data in internal buffers.
virtual void reset() = 0;
Expand Down
11 changes: 9 additions & 2 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,14 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
NIMBLE_CHECK(file_, "Writer is already closed");
try {
auto size = vector->size();
root_->write(vector, OrderedRanges::of(0, size));
if (context_->options.writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
*context_->options.writeExecutor};
root_->write(vector, OrderedRanges::of(0, size), &barrier);
barrier.waitAll();
} else {
root_->write(vector, OrderedRanges::of(0, size));
}

uint64_t memoryUsed = 0;
for (const auto& stream : context_->streams()) {
Expand Down Expand Up @@ -705,7 +712,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {

if (context_->options.encodingExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
context_->options.encodingExecutor};
*context_->options.encodingExecutor};
for (auto& streamData : context_->streams()) {
processStream(
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
21 changes: 17 additions & 4 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2430,8 +2430,10 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
nimble::VeloxWriterOptions writerOptions;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2465,6 +2467,14 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
{"a", velox::REAL()},
{"b", velox::INTEGER()},
})},
{"row",
velox::ROW(
{{"nested_row",
velox::ROW(
{{"nested_nested_row", velox::ROW({{"a", velox::INTEGER()}})},
{"b", velox::INTEGER()}})}})},
{"map",
velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))},
{"nested",
velox::ARRAY(velox::ROW({
{"a", velox::INTEGER()},
Expand Down Expand Up @@ -2517,9 +2527,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) {

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit a33242d

Please sign in to comment.