Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] fix distinct hash aggregation OOM #384

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,25 @@ class AggregationNode : public PlanNode {
// (https://github.com/facebookincubator/velox/issues/3263) and pre-grouped
// aggregation (https://github.com/facebookincubator/velox/issues/3264). We
// will add support later to re-enable.
return (isFinal() || isSingle()) && !(aggregates().empty()) &&
preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
if (!queryConfig.aggregationSpillEnabled()) {
return false;
}

if (!isFinal() && !isSingle()) {
return false;
}

if (!preGroupedKeys().empty()) {
return false;
}

// aggregates().empty() means distinct aggregate
if (aggregates().empty() &&
!queryConfig.distinctAggregationSpillEnabled()) {
return false;
}

return true;
}

bool isFinal() const {
Expand Down
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class QueryConfig {
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";

/// Distinct aggregation spilling flag
static constexpr const char* kDistinctAggregationSpillEnabled =
"distinct_aggregation_spill_enabled";

/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";

Expand Down Expand Up @@ -329,6 +333,11 @@ class QueryConfig {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is distinct aggregation spilling enabled' flag.
bool distinctAggregationSpillEnabled() const {
return get<bool>(kDistinctAggregationSpillEnabled, true);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
Expand Down
91 changes: 86 additions & 5 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ HashAggregation::HashAggregation(
std::vector<AggregateInfo> aggregateInfos;
aggregateInfos.reserve(numAggregates);

printf("[zuochunwei] numHashers:%ld, numAggregates:%ld\n", (long)numHashers, (long)numAggregates);

for (auto i = 0; i < numAggregates; i++) {
const auto& aggregate = aggregationNode->aggregates()[i];

Expand Down Expand Up @@ -127,6 +129,7 @@ HashAggregation::HashAggregation(
"Aggregations over sorted inputs with masks are not supported yet");
}

printf("[zuochunwei] aggregate name:%s, numSortingKeys:%ld\n", aggregate.call->name().c_str(), (long)numSortingKeys);
aggregateInfos.emplace_back(std::move(info));
}

Expand Down Expand Up @@ -159,6 +162,53 @@ HashAggregation::HashAggregation(
spillConfig_.has_value() ? &spillConfig_.value() : nullptr,
&nonReclaimableSection_,
operatorCtx_.get());

distinctAggregationSpillEnabled_ =
driverCtx->queryConfig().distinctAggregationSpillEnabled();

debug("ctor");
}

void HashAggregation::debug(const std::string& str) {
printf("[zuochunwei] %s this=%p, "
"isPartialOutput_:%d, "
"isDistinct_:%d, "
"isGlobal_:%d, "
"isIntermediate_:%d, "
"maxExtendedPartialAggregationMemoryUsage_:%ld, "
"maxPartialAggregationMemoryUsage_:%ld, "
"distinctAggregationSpillEnabled_:%d, "
"partialFull_:%d, "
"newDistincts_:%d, "
"finished_:%d, "
"abandonedPartialAggregation_:%d, "
"abandonPartialAggregationMinRows_:%d, "
"abandonPartialAggregationMinPct_:%d, "
"pushdownChecked_:%d, "
"mayPushdown_:%d, "
"numInputRows_:%ld, "
"numInputVectors_:%ld, "
"numOutputRows_:%ld\n"
, str.c_str(), this
, (int)isPartialOutput_
, (int)isDistinct_
, (int)isGlobal_
, (int)isIntermediate_
, (long int)maxExtendedPartialAggregationMemoryUsage_
, (long int)maxPartialAggregationMemoryUsage_
, (int)distinctAggregationSpillEnabled_
, (int)partialFull_
, (int)newDistincts_
, (int)finished_
, (int)abandonedPartialAggregation_
, (int)abandonPartialAggregationMinRows_
, (int)abandonPartialAggregationMinPct_
, (int)pushdownChecked_
, (int)mayPushdown_
, (long int)numInputRows_
, (long int)numInputVectors_
, (long int)numOutputRows_
);
}

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
Expand All @@ -175,6 +225,8 @@ void HashAggregation::addInput(RowVectorPtr input) {
if (abandonedPartialAggregation_) {
input_ = input;
numInputRows_ += input->size();
printf("[zuochunwei] abandonedPartialAggregation_, numInputRows_:%ld, numOutputRows_:%ld\n",
(long)numInputRows_, (long)numOutputRows_);
return;
}
groupingSet_->addInput(input, mayPushdown_);
Expand All @@ -188,17 +240,21 @@ void HashAggregation::addInput(RowVectorPtr input) {
if (isPartialOutput_ && !isGlobal_ && !isIntermediate_) {
if (groupingSet_->isPartialFull(maxPartialAggregationMemoryUsage_)) {
partialFull_ = true;
printf("[zuochunwei] addInput set partialFull_ = true, maxPartialAggregationMemoryUsage_:%ld\n", (long)maxPartialAggregationMemoryUsage_);
}
uint64_t kDefaultFlushMemory = 1L << 24;
if (groupingSet_->allocatedBytes() > kDefaultFlushMemory &&
abandonPartialAggregationEarly(groupingSet_->numDistinct())) {
partialFull_ = true;
printf("[zuochunwei] partialFull_ = true, allocatedBytes:%ld, numDistinct:%ld\n",
(long)groupingSet_->allocatedBytes(),
(long)groupingSet_->numDistinct());
}
}

const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
newDistincts_ = !groupingSet_->hashLookup().newGroups.empty();

if (newDistincts_) {
Expand Down Expand Up @@ -269,6 +325,9 @@ void HashAggregation::resetPartialOutputIfNeed() {
VELOX_DCHECK(!isGlobal_);
const double aggregationPct =
numOutputRows_ == 0 ? 0 : (numOutputRows_ * 1.0) / numInputRows_ * 100;

char buf[512] = {};
sprintf(buf, " {aggregationPct:%f numOutputRows_:%ld numInputRows_:%ld} ", aggregationPct, numOutputRows_, numInputRows_);
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Expand All @@ -279,6 +338,9 @@ void HashAggregation::resetPartialOutputIfNeed() {
}
groupingSet_->resetPartial();
partialFull_ = false;

debug(buf);

if (!finished_) {
maybeIncreasePartialAggregationMemoryUsage(aggregationPct);
}
Expand All @@ -290,10 +352,19 @@ void HashAggregation::resetPartialOutputIfNeed() {
void HashAggregation::maybeIncreasePartialAggregationMemoryUsage(
double aggregationPct) {
// If more than this many are unique at full memory, give up on partial agg.
constexpr int32_t kPartialMinFinalPct = 40;
constexpr int32_t kPartialMinFinalPct = 20;
VELOX_DCHECK(isPartialOutput_);
// If size is at max and there still is not enough reduction, abandon partial
// aggregation.

char buf[1024];
sprintf(buf, "{numOutputRows_:%ld, aggregationPct:%f, kPartialMinFinalPct:%d, maxPartialAggregationMemoryUsage_:%ld, maxExtendedPartialAggregationMemoryUsage_:%ld}",
numOutputRows_,
aggregationPct,
kPartialMinFinalPct,
(long)maxPartialAggregationMemoryUsage_,
(long)maxExtendedPartialAggregationMemoryUsage_);

if (abandonPartialAggregationEarly(numOutputRows_) ||
(aggregationPct > kPartialMinFinalPct &&
maxPartialAggregationMemoryUsage_ >=
Expand All @@ -302,6 +373,7 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage(
pool()->release();
addRuntimeStat("abandonedPartialAggregation", RuntimeCounter(1));
abandonedPartialAggregation_ = true;
printf("[zuochunwei] set abandonedPartialAggregation_ = true, %s\n", buf);
return;
}
const int64_t extendedPartialAggregationMemoryUsage = std::min(
Expand All @@ -313,7 +385,9 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage(
const int64_t memoryToReserve = std::max<int64_t>(
0,
extendedPartialAggregationMemoryUsage - groupingSet_->allocatedBytes());

if (!pool()->maybeReserve(memoryToReserve)) {
printf("[zuochunwei] maybeReserve %ld return fasle %s\n", memoryToReserve, buf);
return;
}
// Update the aggregation memory usage size limit on memory reservation
Expand All @@ -323,6 +397,9 @@ void HashAggregation::maybeIncreasePartialAggregationMemoryUsage(
"maxExtendedPartialAggregationMemoryUsage",
RuntimeCounter(
maxPartialAggregationMemoryUsage_, RuntimeCounter::Unit::kBytes));

printf("[zuochunwei] maybeReserve %ld return true %s\n", memoryToReserve, buf);
debug("End");
}

RowVectorPtr HashAggregation::getOutput() {
Expand All @@ -340,6 +417,8 @@ RowVectorPtr HashAggregation::getOutput() {
prepareOutput(input_->size());
groupingSet_->toIntermediate(input_, output_);
numOutputRows_ += input_->size();
printf("[zuochunwei] abandonedPartialAggregation_ getOutput, numOutputRows_:%ld, numInputRows_:%ld, inputSize:%ld\n",
(long)numOutputRows_, (long)numInputRows_, (long)input_->size());
input_ = nullptr;
return output_;
}
Expand All @@ -355,7 +434,7 @@ RowVectorPtr HashAggregation::getOutput() {
return nullptr;
}

if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
if (!newDistincts_) {
if (noMoreInput_) {
finished_ = true;
Expand All @@ -372,8 +451,8 @@ RowVectorPtr HashAggregation::getOutput() {
auto output = fillOutput(size, indices);
numOutputRows_ += size;

// Drop reference to input_ to make it singly-referenced at the producer and
// allow for memory reuse.
// Drop reference to input_ to make it singly-referenced at the producer
// and allow for memory reuse.
input_ = nullptr;

resetPartialOutputIfNeed();
Expand All @@ -396,6 +475,8 @@ RowVectorPtr HashAggregation::getOutput() {
return nullptr;
}
numOutputRows_ += output_->size();
printf("[zuochunwei] getOutput, numOutputRows_:%ld, numInputRows_:%ld, outputSize:%ld\n",
(long)numOutputRows_, (long)numInputRows_, (long)output_->size());
return output_;
}

Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class HashAggregation : public Operator {

void close() override;

void debug(const std::string& str);

private:
void updateRuntimeStats();

Expand Down Expand Up @@ -80,6 +82,8 @@ class HashAggregation : public Operator {
int64_t maxPartialAggregationMemoryUsage_;
std::unique_ptr<GroupingSet> groupingSet_;

bool distinctAggregationSpillEnabled_{false};

bool partialFull_ = false;
bool newDistincts_ = false;
bool finished_ = false;
Expand Down