diff --git a/velox/exec/PlanNodeStats.cpp b/velox/exec/PlanNodeStats.cpp index 1b14d27c6b9d..43e6c466a0f0 100644 --- a/velox/exec/PlanNodeStats.cpp +++ b/velox/exec/PlanNodeStats.cpp @@ -19,6 +19,67 @@ namespace facebook::velox::exec { +PlanNodeStats& PlanNodeStats::operator+=(const PlanNodeStats& another) { + inputRows += another.inputRows; + inputBytes += another.inputBytes; + inputVectors += another.inputVectors; + + rawInputRows += another.inputRows; + rawInputBytes += another.rawInputBytes; + + dynamicFilterStats.add(another.dynamicFilterStats); + + outputRows += another.outputRows; + outputBytes += another.outputBytes; + outputVectors += another.outputVectors; + + isBlockedTiming.add(another.isBlockedTiming); + addInputTiming.add(another.addInputTiming); + getOutputTiming.add(another.getOutputTiming); + finishTiming.add(another.finishTiming); + cpuWallTiming.add(another.isBlockedTiming); + cpuWallTiming.add(another.addInputTiming); + cpuWallTiming.add(another.getOutputTiming); + cpuWallTiming.add(another.finishTiming); + cpuWallTiming.add(another.isBlockedTiming); + + backgroundTiming.add(another.backgroundTiming); + + blockedWallNanos += another.blockedWallNanos; + + peakMemoryBytes += another.peakMemoryBytes; + numMemoryAllocations += another.numMemoryAllocations; + + physicalWrittenBytes += another.physicalWrittenBytes; + + for (const auto& [name, customStats] : another.customStats) { + if (UNLIKELY(this->customStats.count(name) == 0)) { + this->customStats.insert(std::make_pair(name, customStats)); + } else { + this->customStats.at(name).merge(customStats); + } + } + + // Populating number of drivers for plan nodes with multiple operators is not + // useful. Each operator could have been executed in different pipelines with + // different number of drivers. + if (!isMultiOperatorTypeNode()) { + numDrivers += another.numDrivers; + } else { + numDrivers = 0; + } + + numSplits += another.numSplits; + + spilledInputBytes += another.spilledInputBytes; + spilledBytes += another.spilledBytes; + spilledRows += another.spilledRows; + spilledPartitions += another.spilledPartitions; + spilledFiles += another.spilledFiles; + + return *this; +} + void PlanNodeStats::add(const OperatorStats& stats) { auto it = operatorStats.find(stats.operatorType); if (it != operatorStats.end()) { @@ -63,11 +124,11 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) { physicalWrittenBytes += stats.physicalWrittenBytes; - for (const auto& [name, runtimeStats] : stats.runtimeStats) { - if (UNLIKELY(customStats.count(name) == 0)) { - customStats.insert(std::make_pair(name, runtimeStats)); + for (const auto& [name, customStats] : stats.runtimeStats) { + if (UNLIKELY(this->customStats.count(name) == 0)) { + this->customStats.insert(std::make_pair(name, customStats)); } else { - customStats.at(name).merge(runtimeStats); + this->customStats.at(name).merge(customStats); } } @@ -105,6 +166,7 @@ std::string PlanNodeStats::toString(bool includeInputStats) const { out << ", Physical written output: " << succinctBytes(physicalWrittenBytes); } out << ", Cpu time: " << succinctNanos(cpuWallTiming.cpuNanos) + << ", Wall time: " << succinctNanos(cpuWallTiming.wallNanos) << ", Blocked wall time: " << succinctNanos(blockedWallNanos) << ", Peak memory: " << succinctBytes(peakMemoryBytes) << ", Memory allocations: " << numMemoryAllocations; diff --git a/velox/exec/PlanNodeStats.h b/velox/exec/PlanNodeStats.h index 544c4ea8e08b..7f8346feb7b4 100644 --- a/velox/exec/PlanNodeStats.h +++ b/velox/exec/PlanNodeStats.h @@ -44,6 +44,8 @@ struct PlanNodeStats { PlanNodeStats(PlanNodeStats&&) = default; PlanNodeStats& operator=(PlanNodeStats&&) = default; + PlanNodeStats& operator+=(const PlanNodeStats&); + /// Sum of input rows for all corresponding operators. Useful primarily for /// leaf plan nodes or plan nodes that correspond to a single operator type. uint64_t inputRows{0}; diff --git a/velox/exec/benchmarks/ExchangeBenchmark.cpp b/velox/exec/benchmarks/ExchangeBenchmark.cpp index 0baf9594ba9d..2fca7d45ce4c 100644 --- a/velox/exec/benchmarks/ExchangeBenchmark.cpp +++ b/velox/exec/benchmarks/ExchangeBenchmark.cpp @@ -41,6 +41,9 @@ DEFINE_int64( "task-wide buffer in local exchange"); DEFINE_int64(exchange_buffer_mb, 32, "task-wide buffer in remote exchange"); DEFINE_int32(dict_pct, 0, "Percentage of columns wrapped in dictionary"); +// Add the following definitions to allow Clion runs +DEFINE_bool(gtest_color, false, ""); +DEFINE_string(gtest_filter, "*", ""); /// Benchmarks repartition/exchange with different batch sizes, /// numbers of destinations and data type mixes. Generates a plan @@ -56,28 +59,35 @@ using namespace facebook::velox::test; namespace { -struct Counters { - int64_t bytes{0}; - int64_t rows{0}; - int64_t usec{0}; - int64_t repartitionNanos{0}; - int64_t exchangeNanos{0}; - int64_t exchangeRows{0}; - int64_t exchangeBatches{0}; - - std::string toString() { - if (exchangeBatches == 0) { - return "N/A"; - } - return fmt::format( - "{}/s repartition={} exchange={} exchange batch={}", - succinctBytes(bytes / (usec / 1.0e6)), - succinctNanos(repartitionNanos), - succinctNanos(exchangeNanos), - exchangeRows / exchangeBatches); - } +struct LocalPartitionWaitStats { + int64_t totalProducerWaitMs = 0; + int64_t totalConsumerWaitMs = 0; + std::vector consumerWaitMs; + std::vector producerWaitMs; + std::vector wallMs; }; +void sortByMax(std::vector& metrics) { + std::sort( + metrics.begin(), + metrics.end(), + [](const RuntimeMetric& left, const RuntimeMetric& right) { + return left.max > right.max; + }); +} + +void sortByAndPrintMax( + const char* title, + int64_t total, + std::vector& metrics) { + sortByMax(metrics); + VELOX_CHECK(!metrics.empty()); + std::cout << title << "\n Total " << succinctNanos(total) + << "\n Max: " << metrics.front().toString() + << "\n Median: " << metrics[metrics.size() / 2].toString() + << "\n Min: " << metrics.back().toString() << std::endl; +} + class ExchangeBenchmark : public VectorTestBase { public: std::vector makeRows( @@ -90,6 +100,7 @@ class ExchangeBenchmark : public VectorTestBase { for (int32_t i = 0; i < numVectors; ++i) { auto vector = std::dynamic_pointer_cast( BatchMaker::createBatch(type, rowsPerVector, *pool_)); + auto width = vector->childrenSize(); for (auto child = 0; child < width; ++child) { if (100 * child / width > dictPct) { @@ -109,126 +120,154 @@ class ExchangeBenchmark : public VectorTestBase { std::vector& vectors, int32_t width, int32_t taskWidth, - Counters& counters) { - assert(!vectors.empty()); - configSettings_[core::QueryConfig::kMaxPartitionedOutputBufferSize] = - fmt::format("{}", FLAGS_exchange_buffer_mb << 20); - auto iteration = ++iteration_; - std::vector> tasks; - std::vector leafTaskIds; - auto leafPlan = exec::test::PlanBuilder() - .values(vectors, true) - .partitionedOutput({"c0"}, width) - .planNode(); + int64_t& wallUs, + PlanNodeStats& partitionedOutputStats, + PlanNodeStats& exchangeStats) { + core::PlanNodePtr plan; + core::PlanNodeId exchangeId; + core::PlanNodeId leafPartitionedOutputId; + core::PlanNodeId finalAggPartitionedOutputId; - auto startMicros = getCurrentTimeMicro(); - for (int32_t counter = 0; counter < width; ++counter) { - auto leafTaskId = makeTaskId(iteration, "leaf", counter); - leafTaskIds.push_back(leafTaskId); - auto leafTask = makeTask(leafTaskId, leafPlan, counter); - tasks.push_back(leafTask); - leafTask->start(taskWidth); - } + std::vector> leafTasks; + std::vector> finalAggTasks; + std::vector finalAggSplits; - core::PlanNodePtr finalAggPlan; - std::vector finalAggTaskIds; - finalAggPlan = - exec::test::PlanBuilder() - .exchange(leafPlan->outputType(), VectorSerde::Kind::kPresto) - .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1) - .planNode(); + RowVectorPtr expected; + + const auto startUs = getCurrentTimeMicro(); + BENCHMARK_SUSPEND { + assert(!vectors.empty()); + configSettings_[core::QueryConfig::kMaxPartitionedOutputBufferSize] = + fmt::format("{}", FLAGS_exchange_buffer_mb << 20); + const auto iteration = ++iteration_; + + // leafPlan: PartitionedOutput/kPartitioned(1) <-- Values(0) + std::vector leafTaskIds; + auto leafPlan = exec::test::PlanBuilder() + .values(vectors, true) + .partitionedOutput({"c0"}, width) + .capturePlanNodeId(leafPartitionedOutputId) + .planNode(); + + for (int32_t counter = 0; counter < width; ++counter) { + auto leafTaskId = makeTaskId(iteration, "leaf", counter); + leafTaskIds.push_back(leafTaskId); + auto leafTask = makeTask(leafTaskId, leafPlan, counter); + leafTasks.push_back(leafTask); + leafTask->start(taskWidth); + } - std::vector finalAggSplits; - for (int i = 0; i < width; i++) { - auto taskId = makeTaskId(iteration, "final-agg", i); - finalAggSplits.push_back( - exec::Split(std::make_shared(taskId))); - auto task = makeTask(taskId, finalAggPlan, i); - tasks.push_back(task); - task->start(taskWidth); - addRemoteSplits(task, leafTaskIds); - } + // finalAggPlan: PartitionedOutput/kPartitioned(2) <-- Agg/kSingle(1) <-- + // Exchange(0) + std::vector finalAggTaskIds; + core::PlanNodePtr finalAggPlan = + exec::test::PlanBuilder() + .exchange(leafPlan->outputType(), VectorSerde::Kind::kPresto) + .capturePlanNodeId(exchangeId) + .singleAggregation({}, {"count(1)"}) + .partitionedOutput({}, 1) + .capturePlanNodeId(finalAggPartitionedOutputId) + .planNode(); + + for (int i = 0; i < width; i++) { + auto taskId = makeTaskId(iteration, "final-agg", i); + finalAggSplits.push_back( + exec::Split(std::make_shared(taskId))); + auto finalAggTask = makeTask(taskId, finalAggPlan, i); + finalAggTasks.push_back(finalAggTask); + finalAggTask->start(taskWidth); + addRemoteSplits(finalAggTask, leafTaskIds); + } - auto plan = - exec::test::PlanBuilder() - .exchange(finalAggPlan->outputType(), VectorSerde::Kind::kPresto) - .singleAggregation({}, {"sum(a0)"}) - .planNode(); + expected = makeRowVector({makeFlatVector(1, [&](auto /*row*/) { + return vectors.size() * vectors[0]->size() * width * taskWidth; + })}); - auto expected = - makeRowVector({makeFlatVector(1, [&](auto /*row*/) { - return vectors.size() * vectors[0]->size() * width * taskWidth; - })}); + // plan: Agg/kSingle(1) <-- Exchange (0) + plan = + exec::test::PlanBuilder() + .exchange(finalAggPlan->outputType(), VectorSerde::Kind::kPresto) + .singleAggregation({}, {"sum(a0)"}) + .planNode(); + }; exec::test::AssertQueryBuilder(plan) .splits(finalAggSplits) .assertResults(expected); - auto elapsed = getCurrentTimeMicro() - startMicros; - int64_t bytes = 0; - int64_t repartitionNanos = 0; - int64_t exchangeNanos = 0; - int64_t exchangeBatches = 0; - int64_t exchangeRows = 0; - for (auto& task : tasks) { - auto stats = task->taskStats(); - for (auto& pipeline : stats.pipelineStats) { - for (auto& op : pipeline.operatorStats) { - if (op.operatorType == "PartitionedOutput") { - repartitionNanos += - op.addInputTiming.cpuNanos + op.getOutputTiming.cpuNanos; - } else if (op.operatorType == "Exchange") { - bytes += op.rawInputBytes; - exchangeRows += op.outputPositions; - exchangeBatches += op.outputVectors; - exchangeNanos += - op.addInputTiming.cpuNanos + op.getOutputTiming.cpuNanos; - } - } + + BENCHMARK_SUSPEND { + wallUs = getCurrentTimeMicro() - startUs; + std::vector taskWallMs; + + for (const auto& task : leafTasks) { + const auto& taskStats = task->taskStats(); + taskWallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + const auto& planStats = toPlanStats(taskStats); + auto& taskPartitionedOutputStats = + planStats.at(leafPartitionedOutputId); + partitionedOutputStats += taskPartitionedOutputStats; } - } - counters.bytes += bytes; - counters.rows += width * vectors.size() * vectors[0]->size(); - counters.usec += elapsed; - counters.repartitionNanos += repartitionNanos; - counters.exchangeNanos += exchangeNanos; - counters.exchangeRows += exchangeRows; - counters.exchangeBatches += exchangeBatches; + for (const auto& task : finalAggTasks) { + const auto& taskStats = task->taskStats(); + taskWallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + const auto& planStats = toPlanStats(taskStats); + + auto& taskPartitionedOutputStats = + planStats.at(finalAggPartitionedOutputId); + partitionedOutputStats += taskPartitionedOutputStats; + + auto& taskExchangeStats = planStats.at(exchangeId); + exchangeStats += taskExchangeStats; + } + }; } void runLocal( std::vector& vectors, int32_t taskWidth, int32_t numTasks, - Counters& counters) { + int64_t& localPartitionWallUs, + PlanNodeStats& partitionedOutputStats, + LocalPartitionWaitStats& localPartitionWaitStats) { assert(!vectors.empty()); - std::vector> tasks; - counters.bytes = vectors[0]->retainedSize() * vectors.size() * numTasks * - FLAGS_num_local_repeat; - std::vector aggregates = {"count(1)"}; - auto& rowType = vectors[0]->type()->as(); - for (auto i = 1; i < rowType.size(); ++i) { - aggregates.push_back(fmt::format("checksum({})", rowType.nameOf(i))); - } - core::PlanNodeId exchangeId; - auto plan = exec::test::PlanBuilder() - .values(vectors, true) - .localPartition({"c0"}) - .capturePlanNodeId(exchangeId) - .singleAggregation({}, aggregates) - .localPartition(std::vector{}) - .singleAggregation({}, {"sum(a0)"}) - .planNode(); - auto startMicros = getCurrentTimeMicro(); + core::PlanNodePtr plan; + core::PlanNodeId localPartitionId1; + core::PlanNodeId localPartitionId2; + std::vector> tasks; std::vector threads; - threads.reserve(numTasks); - auto expected = - makeRowVector({makeFlatVector(1, [&](auto /*row*/) { - return vectors.size() * vectors[0]->size() * taskWidth; - })}); + RowVectorPtr expected; + + BENCHMARK_SUSPEND { + std::vector aggregates = {"count(1)"}; + auto& rowType = vectors[0]->type()->as(); + for (auto i = 1; i < rowType.size(); ++i) { + aggregates.push_back(fmt::format("checksum({})", rowType.nameOf(i))); + } + + // plan: Agg/kSingle(4) <-- LocalPartition/Gather(3) <-- Agg/kGather(2) + // <-- LocalPartition/kRepartition(1) <-- Values(0) + plan = exec::test::PlanBuilder() + .values(vectors, true) + .localPartition({"c0"}) + .capturePlanNodeId(localPartitionId1) + .singleAggregation({}, aggregates) + .localPartition(std::vector{}) + .capturePlanNodeId(localPartitionId2) + .singleAggregation({}, {"sum(a0)"}) + .planNode(); + + threads.reserve(numTasks); + expected = makeRowVector({makeFlatVector(1, [&](auto /*row*/) { + return vectors.size() * vectors[0]->size() * taskWidth; + })}); + }; + + auto startMicros = getCurrentTimeMicro(); std::mutex mutex; for (int32_t i = 0; i < numTasks; ++i) { threads.push_back(std::thread([&]() { @@ -250,30 +289,40 @@ class ExchangeBenchmark : public VectorTestBase { for (auto& thread : threads) { thread.join(); } - counters.usec = getCurrentTimeMicro() - startMicros; - int64_t totalProducer = 0; - int64_t totalConsumer = 0; - std::vector waitConsumer; - std::vector waitProducer; - std::vector wallMs; - for (auto& task : tasks) { - auto taskStats = task->taskStats(); - wallMs.push_back( - taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); - auto planStats = toPlanStats(taskStats); - auto runtimeStats = planStats.at(exchangeId).customStats; - waitProducer.push_back(runtimeStats["blockedWaitForProducerWallNanos"]); - waitConsumer.push_back(runtimeStats["blockedWaitForConsumerWallNanos"]); - totalConsumer += waitConsumer.back().sum; - totalProducer += waitProducer.back().sum; - } - printMax("Producer", totalProducer, waitProducer); - printMax("Consumer", totalConsumer, waitConsumer); - std::sort(wallMs.begin(), wallMs.end()); - assert(!wallMs.empty()); - std::cout << "Wall ms: " << wallMs.back() << " / " - << wallMs[wallMs.size() / 2] << " / " << wallMs.front() - << std::endl; + + BENCHMARK_SUSPEND { + localPartitionWallUs = getCurrentTimeMicro() - startMicros; + + std::vector localPartitionNodeIds{ + localPartitionId1, localPartitionId2}; + + localPartitionWaitStats.totalProducerWaitMs = 0; + localPartitionWaitStats.totalConsumerWaitMs = 0; + for (const auto& task : tasks) { + auto taskStats = task->taskStats(); + localPartitionWaitStats.wallMs.push_back( + taskStats.executionEndTimeMs - taskStats.executionStartTimeMs); + auto planStats = toPlanStats(taskStats); + + for (const auto& nodeId : localPartitionNodeIds) { + auto& taskLocalPartition1Stats = planStats.at(nodeId); + partitionedOutputStats += taskLocalPartition1Stats; + + auto& taskLocalPartition1RuntimeStats = + taskLocalPartition1Stats.customStats; + localPartitionWaitStats.producerWaitMs.push_back( + taskLocalPartition1RuntimeStats + ["blockedWaitForProducerWallNanos"]); + localPartitionWaitStats.consumerWaitMs.push_back( + taskLocalPartition1RuntimeStats + ["blockedWaitForConsumerWallNanos"]); + localPartitionWaitStats.totalProducerWaitMs += + localPartitionWaitStats.producerWaitMs.back().sum; + localPartitionWaitStats.totalConsumerWaitMs += + localPartitionWaitStats.consumerWaitMs.back().sum; + } + } + }; } private: @@ -308,7 +357,7 @@ class ExchangeBenchmark : public VectorTestBase { void addRemoteSplits( std::shared_ptr task, const std::vector& remoteTaskIds) { - for (auto& taskId : remoteTaskIds) { + for (const auto& taskId : remoteTaskIds) { auto split = exec::Split(std::make_shared(taskId), -1); task->addSplit("0", std::move(split)); @@ -316,27 +365,6 @@ class ExchangeBenchmark : public VectorTestBase { task->noMoreSplits("0"); } - void sortByMax(std::vector& metrics) { - std::sort( - metrics.begin(), - metrics.end(), - [](const RuntimeMetric& left, const RuntimeMetric& right) { - return left.max > right.max; - }); - } - - void printMax( - const char* title, - int64_t total, - std::vector& metrics) { - sortByMax(metrics); - assert(!metrics.empty()); - std::cout << title << " Total " << succinctNanos(total) - << " Max: " << metrics.front().toString() - << " Median: " << metrics[metrics.size() / 2].toString() - << " Min: " << metrics.back().toString() << std::endl; - } - std::unordered_map configSettings_; // Serial number to differentiate consecutive benchmark repeats. static int32_t iteration_; @@ -347,19 +375,6 @@ int32_t ExchangeBenchmark::iteration_; std::unique_ptr bm; void runBenchmarks() { - std::vector flat10k; - std::vector deep10k; - std::vector flat50; - std::vector deep50; - std::vector struct1k; - - Counters flat10kCounters; - Counters deep10kCounters; - Counters flat50Counters; - Counters deep50Counters; - Counters localFlat10kCounters; - Counters struct1kCounters; - std::vector flatNames = {"c0"}; std::vector flatTypes = {BIGINT()}; std::vector typeSelection = { @@ -413,49 +428,168 @@ void runBenchmarks() { MAP(BIGINT(), ROW({{"s2_int", INTEGER()}, {"s2_string", VARCHAR()}})))}}); - flat10k = bm->makeRows(flatType, 10, 10000, FLAGS_dict_pct); - deep10k = bm->makeRows(deepType, 10, 10000, FLAGS_dict_pct); - flat50 = bm->makeRows(flatType, 2000, 50, FLAGS_dict_pct); - deep50 = bm->makeRows(deepType, 2000, 50, FLAGS_dict_pct); - struct1k = bm->makeRows(structType, 100, 1000, FLAGS_dict_pct); - + std::vector flat10k( + bm->makeRows(flatType, 10, 10000, FLAGS_dict_pct)); + std::vector deep10k( + bm->makeRows(deepType, 10, 10000, FLAGS_dict_pct)); + std::vector flat50( + bm->makeRows(flatType, 2000, 50, FLAGS_dict_pct)); + std::vector deep50( + bm->makeRows(deepType, 2000, 50, FLAGS_dict_pct)); + std::vector struct1k( + bm->makeRows(structType, 100, 1000, FLAGS_dict_pct)); + + int64_t flat10KWallUs; + PlanNodeStats partitionedOutputStatsFlat10K; + PlanNodeStats exchangeStatsFlat10K; folly::addBenchmark(__FILE__, "exchangeFlat10k", [&]() { - bm->run(flat10k, FLAGS_width, FLAGS_task_width, flat10kCounters); + bm->run( + flat10k, + FLAGS_width, + FLAGS_task_width, + flat10KWallUs, + partitionedOutputStatsFlat10K, + exchangeStatsFlat10K); return 1; }); + int64_t flat50KWallUs; + PlanNodeStats partitionedOutputStatsFlat50; + PlanNodeStats exchangeStatsFlat50; folly::addBenchmark(__FILE__, "exchangeFlat50", [&]() { - bm->run(flat50, FLAGS_width, FLAGS_task_width, flat50Counters); + bm->run( + flat50, + FLAGS_width, + FLAGS_task_width, + flat50KWallUs, + partitionedOutputStatsFlat50, + exchangeStatsFlat50); return 1; }); + int64_t deep10KWallUs; + PlanNodeStats partitionedOutputStatsDeep10K; + PlanNodeStats exchangeStatsDeep10K; folly::addBenchmark(__FILE__, "exchangeDeep10k", [&]() { - bm->run(deep10k, FLAGS_width, FLAGS_task_width, deep10kCounters); + bm->run( + deep10k, + FLAGS_width, + FLAGS_task_width, + deep10KWallUs, + partitionedOutputStatsDeep10K, + exchangeStatsDeep10K); return 1; }); + int64_t deep50KWallUs; + PlanNodeStats partitionedOutputStatsDeep50; + PlanNodeStats exchangeStatsDeep50; folly::addBenchmark(__FILE__, "exchangeDeep50", [&]() { - bm->run(deep50, FLAGS_width, FLAGS_task_width, deep50Counters); + bm->run( + deep50, + FLAGS_width, + FLAGS_task_width, + deep50KWallUs, + partitionedOutputStatsDeep50, + exchangeStatsDeep50); return 1; }); + int64_t stuct1KWallUs; + PlanNodeStats partitionedOutputStatsStruct1K; + PlanNodeStats exchangeStatsStruct1K; folly::addBenchmark(__FILE__, "exchangeStruct1K", [&]() { - bm->run(struct1k, FLAGS_width, FLAGS_task_width, struct1kCounters); + bm->run( + struct1k, + FLAGS_width, + FLAGS_task_width, + stuct1KWallUs, + partitionedOutputStatsStruct1K, + exchangeStatsStruct1K); return 1; }); + int64_t localPartitionWallUs; + PlanNodeStats localPartitionStatsFlat10K; + LocalPartitionWaitStats localPartitionWaitStats; folly::addBenchmark(__FILE__, "localFlat10k", [&]() { bm->runLocal( - flat10k, FLAGS_width, FLAGS_num_local_tasks, localFlat10kCounters); + flat10k, + FLAGS_width, + FLAGS_num_local_tasks, + localPartitionWallUs, + localPartitionStatsFlat10K, + localPartitionWaitStats); return 1; }); folly::runBenchmarks(); - std::cout << "flat10k: " << flat10kCounters.toString() << std::endl - << "flat50: " << flat50Counters.toString() << std::endl - << "deep10k: " << deep10kCounters.toString() << std::endl - << "deep50: " << deep50Counters.toString() << std::endl - << "struct1k: " << struct1kCounters.toString() << std::endl; + + std::cout + << "----------------------------------Flat10K----------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << succinctMicros(flat10KWallUs) << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsFlat10K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsFlat10K.toString() << std::endl; + + std::cout + << "----------------------------------Flat50K----------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << succinctMicros(flat50KWallUs) << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsFlat50.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsFlat10K.toString() << std::endl; + + std::cout + << "----------------------------------Deep10K----------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << succinctMicros(deep10KWallUs) << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsDeep10K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsDeep10K.toString() << std::endl; + + std::cout + << "----------------------------------Deep50K----------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << succinctMicros(deep50KWallUs) << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsDeep50.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsDeep50.toString() << std::endl; + + std::cout + << "----------------------------------Struct1K---------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << succinctMicros(stuct1KWallUs) << std::endl; + std::cout << "PartitionOutput: " << partitionedOutputStatsStruct1K.toString() + << std::endl; + std::cout << "Exchange: " << exchangeStatsStruct1K.toString() << std::endl; + + std::cout + << "--------------------------------LocalFlat10K-------------------------------" + << std::endl; + std::cout << "Wall Time (ms): " << "\n Total: " + << succinctMicros(localPartitionWallUs) + << "\n Max: " << localPartitionWaitStats.wallMs.back() + << "\n Median: " + << localPartitionWaitStats + .wallMs[localPartitionWaitStats.wallMs.size() / 2] + << "\n Min: " << localPartitionWaitStats.wallMs.front() + << std::endl; + std::cout << "LocalPartition: " << localPartitionStatsFlat10K.toString() + << std::endl; + sortByAndPrintMax( + "Producer Wait Time (ms)", + localPartitionWaitStats.totalProducerWaitMs, + localPartitionWaitStats.producerWaitMs); + sortByAndPrintMax( + "Consumer Wait Time (ms)", + localPartitionWaitStats.totalConsumerWaitMs, + localPartitionWaitStats.consumerWaitMs); + std::sort( + localPartitionWaitStats.wallMs.begin(), + localPartitionWaitStats.wallMs.end()); + assert(!localPartitionWaitStats.wallMs.empty()); } } // namespace