diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 9adbb398bac10..bb3be12f29830 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,7 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index daf88bb8a4831..4c17e3f44db74 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) { {AggregationNode::Step::kSingle, true, true, false, false, true}, {AggregationNode::Step::kIntermediate, false, true, false, false, false}, {AggregationNode::Step::kIntermediate, true, false, false, false, false}, - {AggregationNode::Step::kIntermediate, true, true, true, false, false}, + {AggregationNode::Step::kIntermediate, true, true, true, false, true}, {AggregationNode::Step::kIntermediate, true, true, false, true, false}, - {AggregationNode::Step::kIntermediate, true, true, false, false, false}, + {AggregationNode::Step::kIntermediate, true, true, false, false, true}, {AggregationNode::Step::kPartial, false, true, false, false, false}, {AggregationNode::Step::kPartial, true, false, false, false, false}, - {AggregationNode::Step::kPartial, true, true, true, false, false}, + {AggregationNode::Step::kPartial, true, true, true, false, true}, {AggregationNode::Step::kPartial, true, true, false, true, false}, - {AggregationNode::Step::kPartial, true, true, false, false, false}, + {AggregationNode::Step::kPartial, true, true, false, false, true}, {AggregationNode::Step::kFinal, false, true, false, false, false}, {AggregationNode::Step::kFinal, true, false, false, false, false}, {AggregationNode::Step::kFinal, true, true, true, false, true}, diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index fc55c8429a163..b2cf20ba76c9f 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -830,7 +830,7 @@ const HashLookup& GroupingSet::hashLookup() const { void GroupingSet::ensureInputFits(const RowVectorPtr& input) { // Spilling is considered if this is a final or single aggregation and // spillPath is set. - if (isPartial_ || spillConfig_ == nullptr) { + if (spillConfig_ == nullptr) { return; } @@ -913,7 +913,7 @@ void GroupingSet::ensureOutputFits() { // to reserve memory for the output as we can't reclaim much memory from this // operator itself. The output processing can reclaim memory from the other // operator or query through memory arbitration. - if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) { + if (spillConfig_ == nullptr || hasSpilled()) { return; } @@ -960,6 +960,10 @@ void GroupingSet::spill() { return; } + if (hasSpilled() && spiller_->finalized()) { + return; + } + if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index d91c5c6dea778..441326e610b7b 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -20,6 +20,7 @@ #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Aggregate.h" @@ -397,6 +398,33 @@ class AggregationTest : public OperatorTestBase { VARCHAR()})}; folly::Random::DefaultGenerator rng_; memory::MemoryReclaimer::Stats reclaimerStats_; + + std::shared_ptr newQueryCtx( + int64_t memoryCapacity = memory::kMaxMemory) { + std::unordered_map> configs; + std::shared_ptr pool = memoryManager_->addRootPool( + "", memoryCapacity, MemoryReclaimer::create()); + auto queryCtx = std::make_shared( + executor_.get(), + core::QueryConfig({}), + configs, + cache::AsyncDataCache::getInstance(), + std::move(pool)); + return queryCtx; + } + + void setupMemory() { + memory::MemoryManagerOptions options; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance(); + options.allocator = memoryAllocator_.get(); + memoryManager_ = std::make_unique(options); + } + + private: + std::shared_ptr memoryAllocator_; + std::unique_ptr memoryManager_; }; template <> @@ -847,6 +875,104 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) { .customStats.count("flushRowCount")); } +// TODO move to arbitrator test +TEST_F(AggregationTest, partialAggregationSpill) { + VectorFuzzer::Options fuzzerOpts; + fuzzerOpts.vectorSize = 128; + RowTypePtr rowType = ROW( + {{"c0", INTEGER()}, + {"c1", INTEGER()}, + {"c2", INTEGER()}, + {"c3", INTEGER()}, + {"c4", INTEGER()}, + {"c5", INTEGER()}, + {"c6", INTEGER()}, + {"c7", INTEGER()}, + {"c8", INTEGER()}, + {"c9", INTEGER()}, + {"c10", INTEGER()}}); + VectorFuzzer fuzzer(std::move(fuzzerOpts), pool()); + + std::vector vectors; + + const int32_t numVectors = 2000; + for (int i = 0; i < numVectors; i++) { + vectors.push_back(fuzzer.fuzzRow(rowType)); + } + + createDuckDbTable(vectors); + + setupMemory(); + + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + // Set an artificially low limit on the amount of data to accumulate in + // the partial aggregation. + + // Distinct aggregation. + auto spillDirectory1 = exec::test::TempDirectoryPath::create(); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .spillDirectory(spillDirectory1->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(0)) // always spill on final agg + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); + + // Count aggregation. + auto spillDirectory2 = exec::test::TempDirectoryPath::create(); + task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .spillDirectory(spillDirectory2->path) + .config(QueryConfig::kSpillEnabled, "true") + .config(QueryConfig::kAggregationSpillEnabled, "true") + .config( + QueryConfig::kAggregationSpillMemoryThreshold, + std::to_string(0)) // always spill on final agg + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1"); + + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); + + // Global aggregation. + task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(newQueryCtx(10LL << 10 << 10)) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(c0)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT sum(c0) FROM tmp"); + EXPECT_EQ( + 0, + toPlanStats(task->taskStats()) + .at(partialAggNodeId) + .customStats.count("flushRowCount")); + checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false); + checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false); +} + TEST_F(AggregationTest, partialDistinctWithAbandon) { auto vectors = { // 1st batch will produce 100 distinct groups from 10 rows. diff --git a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp index e618115867297..16d8fe1051020 100644 --- a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp @@ -639,6 +639,19 @@ class ApproxPercentileAggregate : public exec::Aggregate { DecodedVector decodedDigest_; private: + bool isConstantVector(const VectorPtr& vec) { + if (vec->isConstantEncoding()) { + return true; + } + VELOX_USER_CHECK(vec->size() > 0); + for (vector_size_t i = 1; i < vec->size(); ++i) { + if (!vec->equalValueAt(vec.get(), i, 0)) { + return false; + } + } + return true; + } + template void addIntermediateImpl( std::conditional_t group, @@ -650,7 +663,8 @@ class ApproxPercentileAggregate : public exec::Aggregate { if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(rowVec); for (int i = kPercentiles; i <= kAccuracy; ++i) { - VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding()); + VELOX_USER_CHECK(isConstantVector( + rowVec->childAt(i))); // spilling flats constant encoding } for (int i = kK; i <= kMaxValue; ++i) { VELOX_USER_CHECK(rowVec->childAt(i)->isFlatEncoding()); @@ -677,10 +691,9 @@ class ApproxPercentileAggregate : public exec::Aggregate { } DecodedVector percentiles(*rowVec->childAt(kPercentiles), *baseRows); - auto percentileIsArray = - rowVec->childAt(kPercentilesIsArray)->asUnchecked>(); - auto accuracy = - rowVec->childAt(kAccuracy)->asUnchecked>(); + DecodedVector percentileIsArray( + *rowVec->childAt(kPercentilesIsArray), *baseRows); + DecodedVector accuracy(*rowVec->childAt(kAccuracy), *baseRows); auto k = rowVec->childAt(kK)->asUnchecked>(); auto n = rowVec->childAt(kN)->asUnchecked>(); auto minValue = rowVec->childAt(kMinValue)->asUnchecked>(); @@ -710,7 +723,7 @@ class ApproxPercentileAggregate : public exec::Aggregate { return; } int i = decoded.index(row); - if (percentileIsArray->isNullAt(i)) { + if (percentileIsArray.isNullAt(i)) { return; } if (!accumulator) { @@ -720,10 +733,10 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase->elements()->asFlatVector(); if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(percentileBaseElements); - VELOX_USER_CHECK(!percentilesBase->isNullAt(indexInBaseVector)); + VELOX_USER_CHECK(!percentiles.isNullAt(indexInBaseVector)); } - bool isArray = percentileIsArray->valueAt(i); + bool isArray = percentileIsArray.valueAt(i); const double* data; vector_size_t len; std::vector isNull; @@ -731,8 +744,8 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase, indexInBaseVector, data, len, isNull); checkSetPercentile(isArray, data, len, isNull); - if (!accuracy->isNullAt(i)) { - checkSetAccuracy(accuracy->valueAt(i)); + if (!accuracy.isNullAt(i)) { + checkSetAccuracy(accuracy.valueAt(i)); } } if constexpr (kSingleGroup) {