From e8f8cbb5d7d6420e4a52907d03f039fb6407fc38 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Tue, 3 Sep 2024 20:04:25 -0700 Subject: [PATCH] misc: Improve ExchangeBenchmark Add detailed operator level statistics to the output. Sample output: ============================================================================ [...]exec/benchmarks/ExchangeBenchmark.cpp relative time/iter iters/s ============================================================================ exchangeFlat10k 5.46s 183.19m exchangeFlat50 13.07s 76.51m exchangeDeep10k 15.17s 65.92m exchangeDeep50 30.18s 33.14m exchangeStruct1K 3.58s 278.96m localFlat10k 46.66s 21.43m ----------------------------------Flat10K---------------------------------- PartitionOutput: Output: 6400064 rows (611.35MB, 1152 batches), Cpu time: 14.42s, Wall time: 30.38s, Blocked wall time: 0ns, Peak memory: 296.00MB, Memory allocations: 102400, Threads: 128, CPU breakdown: B/I/O/F (1.14ms/0ns/14.42s/1.15ms) Exchange: Output: 6400000 rows (1.73GB, 150 batches), Cpu time: 1.67s, Wall time: 3.49s, Blocked wall time: 22.87s, Peak memory: 248.45MB, Memory allocations: 13617, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (34.60ms/4.78ms/1.59s/102.30us) ----------------------------------Flat50---------------------------------- PartitionOutput: Output: 6400064 rows (600.85MB, 1389 batches), Cpu time: 19.88s, Wall time: 38.52s, Blocked wall time: 22.82s, Peak memory: 267.19MB, Memory allocations: 93261, Threads: 127, CPU breakdown: B/I/O/F (47.28ms/0ns/19.73s/50.24ms) Exchange: Output: 6400000 rows (1.73GB, 150 batches), Cpu time: 1.67s, Wall time: 3.49s, Blocked wall time: 22.87s, Peak memory: 248.45MB, Memory allocations: 13617, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (34.60ms/4.78ms/1.59s/102.30us) ---------------------------------Deep10K---------------------------------- PartitionOutput: Output: 6400064 rows (2.98GB, 4538 batches), Cpu time: 45.89s, Wall time: 1m 30s, Blocked wall time: 7m 12s, Peak memory: 560.78MB, Memory allocations: 335952, Threads: 124, CPU breakdown: B/I/O/F (2.27ms/0ns/45.89s/1.77ms) Exchange: Output: 6400000 rows (11.23GB, 760 batches), Cpu time: 10.47s, Wall time: 19.78s, Blocked wall time: 2m 24s, Peak memory: 299.35MB, Memory allocations: 27401, Threads: 60, Splits: 256, CPU breakdown: B/I/O/F (179.38ms/14.82ms/10.09s/3.05ms) ----------------------------------Deep50---------------------------------- PartitionOutput: Output: 6400064 rows (3.09GB, 5217 batches), Cpu time: 1m 2s, Wall time: 2m 3s, Blocked wall time: 13m 40s, Peak memory: 491.98MB, Memory allocations: 339131, Threads: 128, CPU breakdown: B/I/O/F (63.06ms/0ns/1m 1s/65.47ms) Exchange: Output: 6400000 rows (10.45GB, 698 batches), Cpu time: 10.31s, Wall time: 21.25s, Blocked wall time: 7m 32s, Peak memory: 292.26MB, Memory allocations: 25737, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (281.67ms/19.99ms/9.72s/516.54us) ---------------------------------Struct1K--------------------------------- PartitionOutput: Output: 6400064 rows (361.27MB, 1216 batches), Cpu time: 9.69s, Wall time: 20.13s, Blocked wall time: 0ns, Peak memory: 182.81MB, Memory allocations: 66624, Threads: 128, CPU breakdown: B/I/O/F (3.39ms/0ns/9.68s/3.96ms) Exchange: Output: 6400000 rows (982.86MB, 90 batches), Cpu time: 1.88s, Wall time: 3.79s, Blocked wall time: 9.67s, Peak memory: 247.59MB, Memory allocations: 10133, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (14.36ms/3.00ms/1.84s/122.45us) -------------------------------LocalFlat10K------------------------------- LocalPartition: Output: 204802048 rows (27.16GB, 176128 batches), Cpu time: 2m 20s, Wall time: 21m 34s, Blocked wall time: 33m 27s, Peak memory: 10.44MB, Memory allocations: 163840, CPU breakdown: B/I/O/F (217.83ms/2m 18s/1.86s/16.75ms) Producer Total 0ns Max: sum:6511604000, count:1, min:6511604000, max:6511604000 Median: sum:15436246000, count:16, min:355929000, max:1589746000 Min: sum:8482011000, count:16, min:318334000, max:741137000 Consumer Total 0ns Max: sum:15329630000, count:16, min:268246000, max:1666110000 Median: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808 Min: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808 Wall ms: 6522 / 5792 / 5091 --- velox/exec/PlanNodeStats.cpp | 70 ++- velox/exec/PlanNodeStats.h | 2 + velox/exec/benchmarks/ExchangeBenchmark.cpp | 472 ++++++++++++-------- 3 files changed, 358 insertions(+), 186 deletions(-) diff --git a/velox/exec/PlanNodeStats.cpp b/velox/exec/PlanNodeStats.cpp index 1b14d27c6b9df..43e6c466a0f01 100644 --- a/velox/exec/PlanNodeStats.cpp +++ b/velox/exec/PlanNodeStats.cpp @@ -19,6 +19,67 @@ namespace facebook::velox::exec { +PlanNodeStats& PlanNodeStats::operator+=(const PlanNodeStats& another) { + inputRows += another.inputRows; + inputBytes += another.inputBytes; + inputVectors += another.inputVectors; + + rawInputRows += another.inputRows; + rawInputBytes += another.rawInputBytes; + + dynamicFilterStats.add(another.dynamicFilterStats); + + outputRows += another.outputRows; + outputBytes += another.outputBytes; + outputVectors += another.outputVectors; + + isBlockedTiming.add(another.isBlockedTiming); + addInputTiming.add(another.addInputTiming); + getOutputTiming.add(another.getOutputTiming); + finishTiming.add(another.finishTiming); + cpuWallTiming.add(another.isBlockedTiming); + cpuWallTiming.add(another.addInputTiming); + cpuWallTiming.add(another.getOutputTiming); + cpuWallTiming.add(another.finishTiming); + cpuWallTiming.add(another.isBlockedTiming); + + backgroundTiming.add(another.backgroundTiming); + + blockedWallNanos += another.blockedWallNanos; + + peakMemoryBytes += another.peakMemoryBytes; + numMemoryAllocations += another.numMemoryAllocations; + + physicalWrittenBytes += another.physicalWrittenBytes; + + for (const auto& [name, customStats] : another.customStats) { + if (UNLIKELY(this->customStats.count(name) == 0)) { + this->customStats.insert(std::make_pair(name, customStats)); + } else { + this->customStats.at(name).merge(customStats); + } + } + + // Populating number of drivers for plan nodes with multiple operators is not + // useful. Each operator could have been executed in different pipelines with + // different number of drivers. + if (!isMultiOperatorTypeNode()) { + numDrivers += another.numDrivers; + } else { + numDrivers = 0; + } + + numSplits += another.numSplits; + + spilledInputBytes += another.spilledInputBytes; + spilledBytes += another.spilledBytes; + spilledRows += another.spilledRows; + spilledPartitions += another.spilledPartitions; + spilledFiles += another.spilledFiles; + + return *this; +} + void PlanNodeStats::add(const OperatorStats& stats) { auto it = operatorStats.find(stats.operatorType); if (it != operatorStats.end()) { @@ -63,11 +124,11 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) { physicalWrittenBytes += stats.physicalWrittenBytes; - for (const auto& [name, runtimeStats] : stats.runtimeStats) { - if (UNLIKELY(customStats.count(name) == 0)) { - customStats.insert(std::make_pair(name, runtimeStats)); + for (const auto& [name, customStats] : stats.runtimeStats) { + if (UNLIKELY(this->customStats.count(name) == 0)) { + this->customStats.insert(std::make_pair(name, customStats)); } else { - customStats.at(name).merge(runtimeStats); + this->customStats.at(name).merge(customStats); } } @@ -105,6 +166,7 @@ std::string PlanNodeStats::toString(bool includeInputStats) const { out << ", Physical written output: " << succinctBytes(physicalWrittenBytes); } out << ", Cpu time: " << succinctNanos(cpuWallTiming.cpuNanos) + << ", Wall time: " << succinctNanos(cpuWallTiming.wallNanos) << ", Blocked wall time: " << succinctNanos(blockedWallNanos) << ", Peak memory: " << succinctBytes(peakMemoryBytes) << ", Memory allocations: " << numMemoryAllocations; diff --git a/velox/exec/PlanNodeStats.h b/velox/exec/PlanNodeStats.h index 544c4ea8e08bb..7f8346feb7b4e 100644 --- a/velox/exec/PlanNodeStats.h +++ b/velox/exec/PlanNodeStats.h @@ -44,6 +44,8 @@ struct PlanNodeStats { PlanNodeStats(PlanNodeStats&&) = default; PlanNodeStats& operator=(PlanNodeStats&&) = default; + PlanNodeStats& operator+=(const PlanNodeStats&); + /// Sum of input rows for all corresponding operators. Useful primarily for /// leaf plan nodes or plan nodes that correspond to a single operator type. uint64_t inputRows{0}; diff --git a/velox/exec/benchmarks/ExchangeBenchmark.cpp b/velox/exec/benchmarks/ExchangeBenchmark.cpp index 0baf9594ba9de..6c54f494c8485 100644 --- a/velox/exec/benchmarks/ExchangeBenchmark.cpp +++ b/velox/exec/benchmarks/ExchangeBenchmark.cpp @@ -41,6 +41,9 @@ DEFINE_int64( "task-wide buffer in local exchange"); DEFINE_int64(exchange_buffer_mb, 32, "task-wide buffer in remote exchange"); DEFINE_int32(dict_pct, 0, "Percentage of columns wrapped in dictionary"); +// Add the following definitions to allow Clion runs +DEFINE_bool(gtest_color, false, ""); +DEFINE_string(gtest_filter, "*", ""); /// Benchmarks repartition/exchange with different batch sizes, /// numbers of destinations and data type mixes. Generates a plan @@ -56,28 +59,35 @@ using namespace facebook::velox::test; namespace { -struct Counters { - int64_t bytes{0}; - int64_t rows{0}; - int64_t usec{0}; - int64_t repartitionNanos{0}; - int64_t exchangeNanos{0}; - int64_t exchangeRows{0}; - int64_t exchangeBatches{0}; - - std::string toString() { - if (exchangeBatches == 0) { - return "N/A"; - } - return fmt::format( - "{}/s repartition={} exchange={} exchange batch={}", - succinctBytes(bytes / (usec / 1.0e6)), - succinctNanos(repartitionNanos), - succinctNanos(exchangeNanos), - exchangeRows / exchangeBatches); - } +struct LocalPartitionWaitStats { + int64_t totalProducerWaitMs = 0; + int64_t totalConsumerWaitMs = 0; + std::vector consumerWaitMs; + std::vector producerWaitMs; + std::vector wallMs; }; +void sortByMax(std::vector& metrics) { + std::sort( + metrics.begin(), + metrics.end(), + [](const RuntimeMetric& left, const RuntimeMetric& right) { + return left.max > right.max; + }); +} + +void printMax( + const char* title, + int64_t total, + std::vector& metrics) { + sortByMax(metrics); + assert(!metrics.empty()); + std::cout << title << " Total " << succinctNanos(total) + << " Max: " << metrics.front().toString() + << " Median: " << metrics[metrics.size() / 2].toString() + << " Min: " << metrics.back().toString() << std::endl; +} + class ExchangeBenchmark : public VectorTestBase { public: std::vector makeRows( @@ -90,6 +100,7 @@ class ExchangeBenchmark : public VectorTestBase { for (int32_t i = 0; i < numVectors; ++i) { auto vector = std::dynamic_pointer_cast( BatchMaker::createBatch(type, rowsPerVector, *pool_)); + auto width = vector->childrenSize(); for (auto child = 0; child < width; ++child) { if (100 * child / width > dictPct) { @@ -109,126 +120,152 @@ class ExchangeBenchmark : public VectorTestBase { std::vector& vectors, int32_t width, int32_t taskWidth, - Counters& counters) { - assert(!vectors.empty()); - configSettings_[core::QueryConfig::kMaxPartitionedOutputBufferSize] = - fmt::format("{}", FLAGS_exchange_buffer_mb << 20); - auto iteration = ++iteration_; - std::vector> tasks; - std::vector leafTaskIds; - auto leafPlan = exec::test::PlanBuilder() - .values(vectors, true) - .partitionedOutput({"c0"}, width) - .planNode(); + PlanNodeStats& partitionedOutputStats, + PlanNodeStats& exchangeStats) { + core::PlanNodePtr plan; + core::PlanNodeId exchangeId; + core::PlanNodeId leafPartitionedOutputId; + core::PlanNodeId finalAggPartitionedOutputId; - auto startMicros = getCurrentTimeMicro(); - for (int32_t counter = 0; counter < width; ++counter) { - auto leafTaskId = makeTaskId(iteration, "leaf", counter); - leafTaskIds.push_back(leafTaskId); - auto leafTask = makeTask(leafTaskId, leafPlan, counter); - tasks.push_back(leafTask); - leafTask->start(taskWidth); - } + std::vector> leafTasks; + std::vector> finalAggTasks; + std::vector finalAggSplits; - core::PlanNodePtr finalAggPlan; - std::vector finalAggTaskIds; - finalAggPlan = - exec::test::PlanBuilder() - .exchange(leafPlan->outputType(), VectorSerde::Kind::kPresto) - .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1) - .planNode(); + RowVectorPtr expected; - std::vector finalAggSplits; - for (int i = 0; i < width; i++) { - auto taskId = makeTaskId(iteration, "final-agg", i); - finalAggSplits.push_back( - exec::Split(std::make_shared(taskId))); - auto task = makeTask(taskId, finalAggPlan, i); - tasks.push_back(task); - task->start(taskWidth); - addRemoteSplits(task, leafTaskIds); - } + auto startMicros = getCurrentTimeMicro(); + BENCHMARK_SUSPEND { + assert(!vectors.empty()); + configSettings_[core::QueryConfig::kMaxPartitionedOutputBufferSize] = + fmt::format("{}", FLAGS_exchange_buffer_mb << 20); + auto iteration = ++iteration_; + + // leafPlan: PartitionedOutput/kPartitioned(1) <-- Values(0) + std::vector leafTaskIds; + auto leafPlan = exec::test::PlanBuilder() + .values(vectors, true) + .partitionedOutput({"c0"}, width) + .capturePlanNodeId(leafPartitionedOutputId) + .planNode(); + + for (int32_t counter = 0; counter < width; ++counter) { + auto leafTaskId = makeTaskId(iteration, "leaf", counter); + leafTaskIds.push_back(leafTaskId); + auto leafTask = makeTask(leafTaskId, leafPlan, counter); + leafTasks.push_back(leafTask); + leafTask->start(taskWidth); + } - auto plan = - exec::test::PlanBuilder() - .exchange(finalAggPlan->outputType(), VectorSerde::Kind::kPresto) - .singleAggregation({}, {"sum(a0)"}) - .planNode(); + // finalAggPlan: PartitionedOutput/kPartitioned(2) <-- Agg/kSingle(1) <-- + // Exchange(0) + std::vector finalAggTaskIds; + core::PlanNodePtr finalAggPlan = + exec::test::PlanBuilder() + .exchange(leafPlan->outputType(), VectorSerde::Kind::kPresto) + .capturePlanNodeId(exchangeId) + .singleAggregation({}, {"count(1)"}) + .partitionedOutput({}, 1) + .capturePlanNodeId(finalAggPartitionedOutputId) + .planNode(); + + for (int i = 0; i < width; i++) { + auto taskId = makeTaskId(iteration, "final-agg", i); + finalAggSplits.push_back( + exec::Split(std::make_shared(taskId))); + auto finalAggTask = makeTask(taskId, finalAggPlan, i); + finalAggTasks.push_back(finalAggTask); + finalAggTask->start(taskWidth); + addRemoteSplits(finalAggTask, leafTaskIds); + } - auto expected = - makeRowVector({makeFlatVector(1, [&](auto /*row*/) { - return vectors.size() * vectors[0]->size() * width * taskWidth; - })}); + expected = makeRowVector({makeFlatVector(1, [&](auto /*row*/) { + return vectors.size() * vectors[0]->size() * width * taskWidth; + })}); + + // plan: Agg/kSingle(1) <-- Exchange (0) + plan = + exec::test::PlanBuilder() + .exchange(finalAggPlan->outputType(), VectorSerde::Kind::kPresto) + .singleAggregation({}, {"sum(a0)"}) + .planNode(); + }; exec::test::AssertQueryBuilder(plan) .splits(finalAggSplits) .assertResults(expected); - auto elapsed = getCurrentTimeMicro() - startMicros; - int64_t bytes = 0; - int64_t repartitionNanos = 0; - int64_t exchangeNanos = 0; - int64_t exchangeBatches = 0; - int64_t exchangeRows = 0; - for (auto& task : tasks) { - auto stats = task->taskStats(); - for (auto& pipeline : stats.pipelineStats) { - for (auto& op : pipeline.operatorStats) { - if (op.operatorType == "PartitionedOutput") { - repartitionNanos += - op.addInputTiming.cpuNanos + op.getOutputTiming.cpuNanos; - } else if (op.operatorType == "Exchange") { - bytes += op.rawInputBytes; - exchangeRows += op.outputPositions; - exchangeBatches += op.outputVectors; - exchangeNanos += - op.addInputTiming.cpuNanos + op.getOutputTiming.cpuNanos; - } - } + + BENCHMARK_SUSPEND { + auto elapsed = getCurrentTimeMicro() - startMicros; + std::vector taskWallMs; + + for (const auto& task : leafTasks) { + const auto& taskStats = task->taskStats(); + taskWallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + const auto& planStats = toPlanStats(taskStats); + auto& taskPartitionedOutputStats = + planStats.at(leafPartitionedOutputId); + partitionedOutputStats += taskPartitionedOutputStats; } - } - counters.bytes += bytes; - counters.rows += width * vectors.size() * vectors[0]->size(); - counters.usec += elapsed; - counters.repartitionNanos += repartitionNanos; - counters.exchangeNanos += exchangeNanos; - counters.exchangeRows += exchangeRows; - counters.exchangeBatches += exchangeBatches; + for (const auto& task : finalAggTasks) { + const auto& taskStats = task->taskStats(); + taskWallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + const auto& planStats = toPlanStats(taskStats); + + auto& taskPartitionedOutputStats = + planStats.at(finalAggPartitionedOutputId); + partitionedOutputStats += taskPartitionedOutputStats; + + auto& taskExchangeStats = planStats.at(exchangeId); + exchangeStats += taskExchangeStats; + } + }; } void runLocal( std::vector& vectors, int32_t taskWidth, int32_t numTasks, - Counters& counters) { + PlanNodeStats& partitionedOutputStats, + LocalPartitionWaitStats& localPartitionWaitStats) { assert(!vectors.empty()); - std::vector> tasks; - counters.bytes = vectors[0]->retainedSize() * vectors.size() * numTasks * - FLAGS_num_local_repeat; - std::vector aggregates = {"count(1)"}; - auto& rowType = vectors[0]->type()->as(); - for (auto i = 1; i < rowType.size(); ++i) { - aggregates.push_back(fmt::format("checksum({})", rowType.nameOf(i))); - } - core::PlanNodeId exchangeId; - auto plan = exec::test::PlanBuilder() - .values(vectors, true) - .localPartition({"c0"}) - .capturePlanNodeId(exchangeId) - .singleAggregation({}, aggregates) - .localPartition(std::vector{}) - .singleAggregation({}, {"sum(a0)"}) - .planNode(); - auto startMicros = getCurrentTimeMicro(); + core::PlanNodePtr plan; + core::PlanNodeId localPartitionId1; + core::PlanNodeId localPartitionId2; + std::vector> tasks; std::vector threads; - threads.reserve(numTasks); - auto expected = - makeRowVector({makeFlatVector(1, [&](auto /*row*/) { - return vectors.size() * vectors[0]->size() * taskWidth; - })}); + RowVectorPtr expected; + + BENCHMARK_SUSPEND { + std::vector aggregates = {"count(1)"}; + auto& rowType = vectors[0]->type()->as(); + for (auto i = 1; i < rowType.size(); ++i) { + aggregates.push_back(fmt::format("checksum({})", rowType.nameOf(i))); + } + + // plan: Agg/kSingle(4) <-- LocalPartition/Gather(3) <-- Agg/kGather(2) + // <-- LocalPartition/kRepartition(1) <-- Values(0) + plan = exec::test::PlanBuilder() + .values(vectors, true) + .localPartition({"c0"}) + .capturePlanNodeId(localPartitionId1) + .singleAggregation({}, aggregates) + .localPartition(std::vector{}) + .capturePlanNodeId(localPartitionId2) + .singleAggregation({}, {"sum(a0)"}) + .planNode(); + + threads.reserve(numTasks); + expected = makeRowVector({makeFlatVector(1, [&](auto /*row*/) { + return vectors.size() * vectors[0]->size() * taskWidth; + })}); + }; + + auto startMicros = getCurrentTimeMicro(); std::mutex mutex; for (int32_t i = 0; i < numTasks; ++i) { threads.push_back(std::thread([&]() { @@ -250,30 +287,33 @@ class ExchangeBenchmark : public VectorTestBase { for (auto& thread : threads) { thread.join(); } - counters.usec = getCurrentTimeMicro() - startMicros; - int64_t totalProducer = 0; - int64_t totalConsumer = 0; - std::vector waitConsumer; - std::vector waitProducer; - std::vector wallMs; - for (auto& task : tasks) { - auto taskStats = task->taskStats(); - wallMs.push_back( - taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); - auto planStats = toPlanStats(taskStats); - auto runtimeStats = planStats.at(exchangeId).customStats; - waitProducer.push_back(runtimeStats["blockedWaitForProducerWallNanos"]); - waitConsumer.push_back(runtimeStats["blockedWaitForConsumerWallNanos"]); - totalConsumer += waitConsumer.back().sum; - totalProducer += waitProducer.back().sum; - } - printMax("Producer", totalProducer, waitProducer); - printMax("Consumer", totalConsumer, waitConsumer); - std::sort(wallMs.begin(), wallMs.end()); - assert(!wallMs.empty()); - std::cout << "Wall ms: " << wallMs.back() << " / " - << wallMs[wallMs.size() / 2] << " / " << wallMs.front() - << std::endl; + + BENCHMARK_SUSPEND { + auto elapsed = getCurrentTimeMicro() - startMicros; + + std::vector localPartitionNodeIds{ + localPartitionId1, localPartitionId2}; + for (const auto& task : tasks) { + auto taskStats = task->taskStats(); + localPartitionWaitStats.wallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + auto planStats = toPlanStats(taskStats); + + for (const auto& nodeId : localPartitionNodeIds) { + auto& taskLocalPartition1Stats = planStats.at(nodeId); + partitionedOutputStats += taskLocalPartition1Stats; + + auto& taskLocalPartition1RuntimeStats = + taskLocalPartition1Stats.customStats; + localPartitionWaitStats.producerWaitMs.push_back( + taskLocalPartition1RuntimeStats + ["blockedWaitForProducerWallNanos"]); + localPartitionWaitStats.consumerWaitMs.push_back( + taskLocalPartition1RuntimeStats + ["blockedWaitForConsumerWallNanos"]); + } + } + }; } private: @@ -308,7 +348,7 @@ class ExchangeBenchmark : public VectorTestBase { void addRemoteSplits( std::shared_ptr task, const std::vector& remoteTaskIds) { - for (auto& taskId : remoteTaskIds) { + for (const auto& taskId : remoteTaskIds) { auto split = exec::Split(std::make_shared(taskId), -1); task->addSplit("0", std::move(split)); @@ -316,27 +356,6 @@ class ExchangeBenchmark : public VectorTestBase { task->noMoreSplits("0"); } - void sortByMax(std::vector& metrics) { - std::sort( - metrics.begin(), - metrics.end(), - [](const RuntimeMetric& left, const RuntimeMetric& right) { - return left.max > right.max; - }); - } - - void printMax( - const char* title, - int64_t total, - std::vector& metrics) { - sortByMax(metrics); - assert(!metrics.empty()); - std::cout << title << " Total " << succinctNanos(total) - << " Max: " << metrics.front().toString() - << " Median: " << metrics[metrics.size() / 2].toString() - << " Min: " << metrics.back().toString() << std::endl; - } - std::unordered_map configSettings_; // Serial number to differentiate consecutive benchmark repeats. static int32_t iteration_; @@ -353,12 +372,20 @@ void runBenchmarks() { std::vector deep50; std::vector struct1k; - Counters flat10kCounters; - Counters deep10kCounters; - Counters flat50Counters; - Counters deep50Counters; - Counters localFlat10kCounters; - Counters struct1kCounters; + PlanNodeStats partitionedOutputStatsFlat10K; + PlanNodeStats exchangeStatsFlat10K; + LocalPartitionWaitStats partitionedOutputWaitStats; + PlanNodeStats partitionedOutputStatsFlat50; + PlanNodeStats exchangeStatsFlat50; + PlanNodeStats partitionedOutputStatsDeep10K; + PlanNodeStats exchangeStatsDeep10K; + PlanNodeStats partitionedOutputStatsDeep50; + PlanNodeStats exchangeStatsDeep50; + PlanNodeStats partitionedOutputStatsStruct1K; + PlanNodeStats exchangeStatsStruct1K; + + PlanNodeStats localPartitionStatsFlat10K; + LocalPartitionWaitStats localPartitionWaitStats; std::vector flatNames = {"c0"}; std::vector flatTypes = {BIGINT()}; @@ -420,42 +447,123 @@ void runBenchmarks() { struct1k = bm->makeRows(structType, 100, 1000, FLAGS_dict_pct); folly::addBenchmark(__FILE__, "exchangeFlat10k", [&]() { - bm->run(flat10k, FLAGS_width, FLAGS_task_width, flat10kCounters); + bm->run( + flat10k, + FLAGS_width, + FLAGS_task_width, + partitionedOutputStatsFlat10K, + exchangeStatsFlat10K); return 1; }); folly::addBenchmark(__FILE__, "exchangeFlat50", [&]() { - bm->run(flat50, FLAGS_width, FLAGS_task_width, flat50Counters); + bm->run( + flat50, + FLAGS_width, + FLAGS_task_width, + partitionedOutputStatsFlat50, + exchangeStatsFlat50); return 1; }); folly::addBenchmark(__FILE__, "exchangeDeep10k", [&]() { - bm->run(deep10k, FLAGS_width, FLAGS_task_width, deep10kCounters); + bm->run( + deep10k, + FLAGS_width, + FLAGS_task_width, + partitionedOutputStatsDeep10K, + exchangeStatsDeep10K); return 1; }); folly::addBenchmark(__FILE__, "exchangeDeep50", [&]() { - bm->run(deep50, FLAGS_width, FLAGS_task_width, deep50Counters); + bm->run( + deep50, + FLAGS_width, + FLAGS_task_width, + partitionedOutputStatsDeep50, + exchangeStatsDeep50); return 1; }); folly::addBenchmark(__FILE__, "exchangeStruct1K", [&]() { - bm->run(struct1k, FLAGS_width, FLAGS_task_width, struct1kCounters); + bm->run( + struct1k, + FLAGS_width, + FLAGS_task_width, + partitionedOutputStatsStruct1K, + exchangeStatsStruct1K); return 1; }); folly::addBenchmark(__FILE__, "localFlat10k", [&]() { bm->runLocal( - flat10k, FLAGS_width, FLAGS_num_local_tasks, localFlat10kCounters); + flat10k, + FLAGS_width, + FLAGS_num_local_tasks, + localPartitionStatsFlat10K, + localPartitionWaitStats); return 1; }); folly::runBenchmarks(); - std::cout << "flat10k: " << flat10kCounters.toString() << std::endl - << "flat50: " << flat50Counters.toString() << std::endl - << "deep10k: " << deep10kCounters.toString() << std::endl - << "deep50: " << deep50Counters.toString() << std::endl - << "struct1k: " << struct1kCounters.toString() << std::endl; + + std::cout + << "----------------------------------Flat10K----------------------------------" + << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsFlat10K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsFlat10K.toString() << std::endl; + + std::cout + << "----------------------------------Flat50----------------------------------" + << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsFlat50.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsFlat10K.toString() << std::endl; + + std::cout + << "---------------------------------Deep10K----------------------------------" + << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsDeep10K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsDeep10K.toString() << std::endl; + + std::cout + << "----------------------------------Deep50----------------------------------" + << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsDeep50.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsDeep50.toString() << std::endl; + + std::cout + << "---------------------------------Struct1K---------------------------------" + << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsStruct1K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsStruct1K.toString() << std::endl; + + std::cout + << "-------------------------------LocalFlat10K-------------------------------" + << std::endl; + std::cout << "LocalPartition: " << localPartitionStatsFlat10K.toString() + << std::endl; + printMax( + "Producer", + localPartitionWaitStats.totalProducerWaitMs, + localPartitionWaitStats.producerWaitMs); + printMax( + "Consumer", + localPartitionWaitStats.totalConsumerWaitMs, + localPartitionWaitStats.consumerWaitMs); + std::sort( + localPartitionWaitStats.wallMs.begin(), + localPartitionWaitStats.wallMs.end()); + assert(!localPartitionWaitStats.wallMs.empty()); + std::cout << "Wall ms: " << localPartitionWaitStats.wallMs.back() << " / " + << localPartitionWaitStats + .wallMs[localPartitionWaitStats.wallMs.size() / 2] + << " / " << localPartitionWaitStats.wallMs.front() << std::endl; } } // namespace