Skip to content

Commit

Permalink
Optimize MergedFlatMapFieldReader 3 times faster
Browse files Browse the repository at this point in the history
Summary:
1. Copy map values in batch instead of row by row
2. Optimize CPU cache access pattern to avoid thrashing

Reviewed By: xiaoxmeng

Differential Revision: D54959863
  • Loading branch information
Yuhta authored and facebook-github-bot committed Mar 18, 2024
1 parent 83f3211 commit 7a5f5fe
Showing 1 changed file with 74 additions and 44 deletions.
118 changes: 74 additions & 44 deletions dwio/alpha/velox/FieldReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1817,76 +1817,106 @@ class MergedFlatMapFieldReader final
velox::BufferPtr lengths = vector->mutableSizes(rowCount);
uint32_t nonNullCount = this->loadNulls(rowCount, vector);

std::vector<FlatMapKeyNode<T>*> nodes;
nodes_.clear();
std::vector<const velox::BaseVector*> nodeValues;
size_t totalChildren = 0;
for (auto& node : this->keyNodes_) {
auto hasValues = node->load(nonNullCount);
if (hasValues) {
nodes.push_back(node.get());
nodes_.push_back(node.get());
const auto& nodeValueVector = node->values();
nodeValues.push_back(nodeValueVector.get());
totalChildren += nodeValueVector->size();
}
}

std::vector<uint32_t> nodeIndices(nodes.size(), 0);
auto& mapValueType = this->type_->asMap().valueType();
velox::VectorPtr valuesVector;
if (totalChildren > 0) {
keysVector->resize(totalChildren);
keysVector->resize(totalChildren, false);
initializeVector(valuesVector, mapValueType, this->pool_, nodeValues);
valuesVector->resize(totalChildren, false);
}

velox::vector_size_t offset = 0;
auto* offsetsPtr = offsets->asMutable<velox::vector_size_t>();
auto* lengthsPtr = lengths->asMutable<velox::vector_size_t>();

if constexpr (hasNull) {
uint32_t inMapIndex = 0;
for (uint32_t i = 0; i < rowCount; ++i) {
offsetsPtr[i] = offset;
if (this->boolBuffer_[i]) {
for (size_t j = 0; j < nodes.size(); ++j) {
if (nodes[j]->inMap(inMapIndex)) {
flatKeysVector->set(offset, nodes[j]->key().get());
copyOne(
mapValueType,
*valuesVector,
offset,
*nodeValues[j],
nodeIndices[j]);
++offset;
nodeIndices[j]++;
}
}
++inMapIndex;
}
lengthsPtr[i] = offset - offsetsPtr[i];
}
} else {
for (uint32_t i = 0; i < rowCount; ++i) {
offsetsPtr[i] = offset;
for (size_t j = 0; j < nodes.size(); ++j) {
if (nodes[j]->inMap(i)) {
flatKeysVector->set(offset, nodes[j]->key().get());
copyOne(
mapValueType,
*valuesVector,
offset,
*nodeValues[j],
nodeIndices[j]);
++offset;
nodeIndices[j]++;
}
initRowWiseInMap(rowCount);
initOffsets(rowCount, offsetsPtr, lengthsPtr);

// Always access inMap and value streams node-wise to avoid large striding
// through the memory and destroying CPU cache performance.
//
// Index symbology used in this class:
// i : Row index
// j : Node index
for (size_t j = 0; j < nodes_.size(); ++j) {
copyRanges_.clear();
for (velox::vector_size_t i = 0; i < rowCount; ++i) {
if (!velox::bits::isBitSet(
rowWiseInMap_.data(), j + i * nodes_.size())) {
continue;
}
lengthsPtr[i] = offset - offsetsPtr[i];
const velox::vector_size_t sourceIndex = copyRanges_.size();
copyRanges_.push_back({sourceIndex, offsetsPtr[i], 1});
flatKeysVector->set(offsetsPtr[i], nodes_[j]->key().get());
++offsetsPtr[i];
}
valuesVector->copyRanges(nodeValues[j], copyRanges_);
}
if (rowCount > 0) {
ALPHA_ASSERT(
offsetsPtr[rowCount - 1] == totalChildren,
"Total map entry size mismatch");
// We updated `offsetsPtr' during the copy process, so that now it was
// shifted to the left by 1 element (i.e. offsetsPtr[i] is really
// offsetsPtr[i+1]). Need to restore the values back to their correct
// positions.
std::copy_backward(
offsetsPtr, offsetsPtr + rowCount - 1, offsetsPtr + rowCount);
offsetsPtr[0] = 0;
}

// Reset the updated value vector to result
vector->setKeysAndValues(std::move(keysVector), std::move(valuesVector));
}

private:
void initRowWiseInMap(velox::vector_size_t rowCount) {
rowWiseInMap_.resize(velox::bits::nwords(nodes_.size() * rowCount));
std::fill(rowWiseInMap_.begin(), rowWiseInMap_.end(), 0);
for (size_t j = 0; j < nodes_.size(); ++j) {
uint32_t inMapIndex = 0;
for (velox::vector_size_t i = 0; i < rowCount; ++i) {
const bool isNull = hasNull && !this->boolBuffer_[i];
if (!isNull && nodes_[j]->inMap(inMapIndex)) {
velox::bits::setBit(rowWiseInMap_.data(), j + i * nodes_.size());
}
inMapIndex += !isNull;
}
}
}

void initOffsets(
velox::vector_size_t rowCount,
velox::vector_size_t* offsets,
velox::vector_size_t* lengths) {
velox::vector_size_t offset = 0;
for (velox::vector_size_t i = 0; i < rowCount; ++i) {
offsets[i] = offset;
lengths[i] = velox::bits::countBits(
rowWiseInMap_.data(), i * nodes_.size(), (i + 1) * nodes_.size());
offset += lengths[i];
}
}

// All the nodes that is selected to be read.
std::vector<FlatMapKeyNode<T>*> nodes_;

// In-map mask (1 bit per value), organized in row first layout.
std::vector<uint64_t> rowWiseInMap_;

// Copy ranges from one node values into the merged values.
std::vector<velox::BaseVector::CopyRange> copyRanges_;
};

template <typename T>
Expand Down

0 comments on commit 7a5f5fe

Please sign in to comment.