Skip to content

Commit

Permalink
add decodingContextPool to nimble parallel writer (#101)
Browse files Browse the repository at this point in the history
Summary:

Switching to pool model for parallelism. Instead of a single shared decodedvector and a single selectivityvector, we get a context of decodedvector and selectivityvector that gets recycled back into a queue

Reviewed By: helfman

Differential Revision: D65379001
  • Loading branch information
Scott Young authored and facebook-github-bot committed Dec 4, 2024
1 parent d66edc1 commit d4da823
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 93 deletions.
143 changes: 66 additions & 77 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@

namespace facebook::nimble {

class FieldWriterContext::LocalDecodedVector {
public:
explicit LocalDecodedVector(FieldWriterContext& context)
: context_(context), vector_(context_.getDecodedVector()) {}

LocalDecodedVector(LocalDecodedVector&& other) noexcept
: context_{other.context_}, vector_{std::move(other.vector_)} {}

LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete;

~LocalDecodedVector() {
if (vector_) {
context_.releaseDecodedVector(std::move(vector_));
}
}

velox::DecodedVector& get() {
return *vector_;
}

private:
FieldWriterContext& context_;
std::unique_ptr<velox::DecodedVector> vector_;
};

namespace {

template <velox::TypeKind KIND>
Expand Down Expand Up @@ -359,8 +334,8 @@ class SimpleFieldWriter : public FieldWriter {
});
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
iterateNonNullValues(
ranges,
Expand Down Expand Up @@ -421,8 +396,8 @@ class RowFieldWriter : public FieldWriter {
childRangesPtr = &ranges;
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
row = decoded.base()->as<velox::RowVector>();
NIMBLE_ASSERT(row, "Unexpected vector type");
NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch");
Expand Down Expand Up @@ -500,8 +475,8 @@ class MultiValueFieldWriter : public FieldWriter {
iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
casted = decoded.base()->as<T>();
NIMBLE_ASSERT(casted, "Unexpected vector type");
offsets = casted->rawOffsets();
Expand Down Expand Up @@ -715,8 +690,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
iterableVector,
processMapIndex);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
mapVector = decoded.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(mapVector, "Unexpected vector type");
rawOffsets = mapVector->rawOffsets();
Expand Down Expand Up @@ -965,8 +940,8 @@ class FlatMapFieldWriter : public FieldWriter {
// Keys are encoded. Decode.
iterateNonNullIndices<false>(
ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges);
auto localDecodedKeys = decode(mapKeys, keyRanges);
auto& decodedKeys = localDecodedKeys.get();
auto decodingContext = context_.getDecodingContext();
auto& decodedKeys = decodingContext.decode(mapKeys, keyRanges);
Decoded<KeyType> keysVector{decodedKeys};
iterateNonNullIndices<true>(
ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) {
Expand All @@ -990,8 +965,8 @@ class FlatMapFieldWriter : public FieldWriter {
processVector(map, Flat{vector});
} else {
// Map is encoded. Decode.
auto localDecodedMap = decode(vector, ranges);
auto& decodedMap = localDecodedMap.get();
auto decodingContext = context_.getDecodingContext();
auto& decodedMap = decodingContext.decode(vector, ranges);
map = decodedMap.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(map, "Unexpected vector type");
offsets = map->rawOffsets();
Expand Down Expand Up @@ -1375,8 +1350,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {

iterateNonNullIndices<false>(ranges, nonNulls, iterableVector, dedupProc);
} else {
auto localDecoded = decode(vectorElements, childRanges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vectorElements, childRanges);
/** compare array at index and prevIndex to be equal */
compareConsecutive = [&](velox::vector_size_t index,
velox::vector_size_t prevIndex) {
Expand Down Expand Up @@ -1455,8 +1430,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
ingestLengthsOffsetsByElements(
arrayVector, iterableVector, ranges, childRanges, filteredRanges);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
arrayVector = decoded.base()->template as<velox::ArrayVector>();
NIMBLE_ASSERT(arrayVector, "Unexpected vector type");
rawOffsets = arrayVector->rawOffsets();
Expand Down Expand Up @@ -1510,51 +1485,65 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

} // namespace

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
DecodingContextPool::DecodingContext::DecodingContext(
DecodingContextPool& pool,
std::unique_ptr<velox::DecodedVector> decodedVector,
std::unique_ptr<velox::SelectivityVector> selectivityVector)
: pool_{pool},
decodedVector_{std::move(decodedVector)},
selectivityVector_{std::move(selectivityVector)} {}

DecodingContextPool::DecodingContext::~DecodingContext() {
pool_.addContext(std::move(decodedVector_), std::move(selectivityVector_));
}

velox::SelectivityVector& FieldWriterContext::getSelectivityVector(
velox::vector_size_t size) {
if (LIKELY(selectivity_.get() != nullptr)) {
selectivity_->resize(size);
} else {
selectivity_ = std::make_unique<velox::SelectivityVector>(size);
}
return *selectivity_;
velox::DecodedVector& DecodingContextPool::DecodingContext::decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
selectivityVector_->resize(vector->size());
selectivityVector_->clearAll();
ranges.apply([&](auto offset, auto size) {
selectivityVector_->setValidRange(offset, offset + size, true);
});
selectivityVector_->updateBounds();

decodedVector_->decode(*vector, *selectivityVector_);
return *decodedVector_;
}

std::unique_ptr<velox::DecodedVector> FieldWriterContext::getDecodedVector() {
if (decodedVectorPool_.empty()) {
return std::make_unique<velox::DecodedVector>();
}
auto vector = std::move(decodedVectorPool_.back());
decodedVectorPool_.pop_back();
return vector;
DecodingContextPool::DecodingContextPool(
std::function<void(void)> vectorDecoderVisitor)
: vectorDecoderVisitor_{std::move(vectorDecoderVisitor)} {
NIMBLE_CHECK(vectorDecoderVisitor_, "vectorDecoderVisitor must be set");
pool_.reserve(std::thread::hardware_concurrency());
}

void FieldWriterContext::releaseDecodedVector(
std::unique_ptr<velox::DecodedVector>&& vector) {
decodedVectorPool_.push_back(std::move(vector));
void DecodingContextPool::addContext(
std::unique_ptr<velox::DecodedVector> decodedVector,
std::unique_ptr<velox::SelectivityVector> selectivityVector) {
std::scoped_lock<std::mutex> lock{mutex_};
pool_.push_back(
std::pair(std::move(decodedVector), std::move(selectivityVector)));
}

FieldWriterContext::LocalDecodedVector FieldWriter::decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
auto& selectivityVector = context_.getSelectivityVector(vector->size());
// initialize selectivity vector
selectivityVector.clearAll();
ranges.apply([&](auto offset, auto size) {
selectivityVector.setValidRange(offset, offset + size, true);
});
selectivityVector.updateBounds();
DecodingContextPool::DecodingContext DecodingContextPool::reserveContext() {
vectorDecoderVisitor_();

std::scoped_lock<std::mutex> lock{mutex_};
if (pool_.empty()) {
return DecodingContext{
*this,
std::make_unique<velox::DecodedVector>(),
std::make_unique<velox::SelectivityVector>()};
}

auto pair = std::move(pool_.back());
pool_.pop_back();
return DecodingContext{*this, std::move(pair.first), std::move(pair.second)};
}

auto localDecoded = context_.getLocalDecodedVector();
localDecoded.get().decode(*vector, selectivityVector);
return localDecoded;
size_t DecodingContextPool::size() const {
return pool_.size();
}

std::unique_ptr<FieldWriter> FieldWriter::create(
Expand Down
63 changes: 47 additions & 16 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,49 @@ struct InputBufferGrowthStats {
std::atomic<uint64_t> itemCount{0};
};

struct FieldWriterContext {
class LocalDecodedVector;
// A pool of decoding contexts. Decoding contexts are used to decode a vector
// and its associated selectivity vector. The pool is used to avoid
// repeated allocations of decoding context.
class DecodingContextPool {
public:
class DecodingContext {
public:
explicit DecodingContext(
DecodingContextPool& pool,
std::unique_ptr<velox::DecodedVector> decodedVector,
std::unique_ptr<velox::SelectivityVector> selectivityVector);

~DecodingContext();
velox::DecodedVector& decode(
const velox::VectorPtr& vector,
const range_helper::OrderedRanges<velox::vector_size_t>& ranges);

private:
DecodingContextPool& pool_;
std::unique_ptr<velox::DecodedVector> decodedVector_;
std::unique_ptr<velox::SelectivityVector> selectivityVector_;
};

explicit DecodingContextPool(
std::function<void(void)> vectorDecoderVisitor = []() {});

DecodingContext reserveContext();
size_t size() const;

private:
std::mutex mutex_;
std::vector<std::pair<
std::unique_ptr<velox::DecodedVector>,
std::unique_ptr<velox::SelectivityVector>>>
pool_;
std::function<void(void)> vectorDecoderVisitor_;

void addContext(
std::unique_ptr<velox::DecodedVector> decodedVector,
std::unique_ptr<velox::SelectivityVector> selectivityVector);
};

struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
Expand All @@ -47,7 +87,7 @@ struct FieldWriterContext {
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
vectorDecoderVisitor(std::move(vectorDecoderVisitor)) {
decodingContextPool_{std::move(vectorDecoderVisitor)} {
resetStringBuffer();
}

Expand All @@ -67,10 +107,9 @@ struct FieldWriterContext {
std::function<void(const TypeBuilder&)> typeAddedHandler =
[](const TypeBuilder&) {};

std::function<void(void)> vectorDecoderVisitor;

LocalDecodedVector getLocalDecodedVector();
velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size);
DecodingContextPool::DecodingContext getDecodingContext() {
return decodingContextPool_.reserveContext();
}

Buffer& stringBuffer() {
return *buffer_;
Expand Down Expand Up @@ -108,12 +147,8 @@ struct FieldWriterContext {
}

private:
std::unique_ptr<velox::DecodedVector> getDecodedVector();
void releaseDecodedVector(std::unique_ptr<velox::DecodedVector>&& vector);

std::unique_ptr<Buffer> buffer_;
std::vector<std::unique_ptr<velox::DecodedVector>> decodedVectorPool_;
std::unique_ptr<velox::SelectivityVector> selectivity_;
DecodingContextPool decodingContextPool_;
std::vector<std::unique_ptr<StreamData>> streams_;
};

Expand Down Expand Up @@ -156,10 +191,6 @@ class FieldWriter {
protected:
FieldWriterContext& context_;
std::shared_ptr<TypeBuilder> typeBuilder_;

FieldWriterContext::LocalDecodedVector decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges);
};

} // namespace facebook::nimble
Loading

0 comments on commit d4da823

Please sign in to comment.