Skip to content

Commit

Permalink
misc: Improve ExchangeBenchmark
Browse files Browse the repository at this point in the history
Add detailed operator level statistics to the output.

Sample output:
============================================================================
[...]exec/benchmarks/ExchangeBenchmark.cpp     relative  time/iter   iters/s
============================================================================
exchangeFlat10k                                              5.46s   183.19m
exchangeFlat50                                              13.07s    76.51m
exchangeDeep10k                                             15.17s    65.92m
exchangeDeep50                                              30.18s    33.14m
exchangeStruct1K                                             3.58s   278.96m
localFlat10k                                                46.66s    21.43m
----------------------------------Flat10K----------------------------------
PartitionOutput: Output: 6400064 rows (611.35MB, 1152 batches), Cpu time: 14.42s, Wall time: 30.38s, Blocked wall time: 0ns, Peak memory: 296.00MB, Memory allocations: 102400, Threads: 128, CPU breakdown: B/I/O/F (1.14ms/0ns/14.42s/1.15ms)
Exchange: Output: 6400000 rows (1.73GB, 150 batches), Cpu time: 1.67s, Wall time: 3.49s, Blocked wall time: 22.87s, Peak memory: 248.45MB, Memory allocations: 13617, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (34.60ms/4.78ms/1.59s/102.30us)
----------------------------------Flat50----------------------------------
PartitionOutput: Output: 6400064 rows (600.85MB, 1389 batches), Cpu time: 19.88s, Wall time: 38.52s, Blocked wall time: 22.82s, Peak memory: 267.19MB, Memory allocations: 93261, Threads: 127, CPU breakdown: B/I/O/F (47.28ms/0ns/19.73s/50.24ms)
Exchange: Output: 6400000 rows (1.73GB, 150 batches), Cpu time: 1.67s, Wall time: 3.49s, Blocked wall time: 22.87s, Peak memory: 248.45MB, Memory allocations: 13617, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (34.60ms/4.78ms/1.59s/102.30us)
---------------------------------Deep10K----------------------------------
PartitionOutput: Output: 6400064 rows (2.98GB, 4538 batches), Cpu time: 45.89s, Wall time: 1m 30s, Blocked wall time: 7m 12s, Peak memory: 560.78MB, Memory allocations: 335952, Threads: 124, CPU breakdown: B/I/O/F (2.27ms/0ns/45.89s/1.77ms)
Exchange: Output: 6400000 rows (11.23GB, 760 batches), Cpu time: 10.47s, Wall time: 19.78s, Blocked wall time: 2m 24s, Peak memory: 299.35MB, Memory allocations: 27401, Threads: 60, Splits: 256, CPU breakdown: B/I/O/F (179.38ms/14.82ms/10.09s/3.05ms)
----------------------------------Deep50----------------------------------
PartitionOutput: Output: 6400064 rows (3.09GB, 5217 batches), Cpu time: 1m 2s, Wall time: 2m 3s, Blocked wall time: 13m 40s, Peak memory: 491.98MB, Memory allocations: 339131, Threads: 128, CPU breakdown: B/I/O/F (63.06ms/0ns/1m 1s/65.47ms)
Exchange: Output: 6400000 rows (10.45GB, 698 batches), Cpu time: 10.31s, Wall time: 21.25s, Blocked wall time: 7m 32s, Peak memory: 292.26MB, Memory allocations: 25737, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (281.67ms/19.99ms/9.72s/516.54us)
---------------------------------Struct1K---------------------------------
PartitionOutput: Output: 6400064 rows (361.27MB, 1216 batches), Cpu time: 9.69s, Wall time: 20.13s, Blocked wall time: 0ns, Peak memory: 182.81MB, Memory allocations: 66624, Threads: 128, CPU breakdown: B/I/O/F (3.39ms/0ns/9.68s/3.96ms)
Exchange: Output: 6400000 rows (982.86MB, 90 batches), Cpu time: 1.88s, Wall time: 3.79s, Blocked wall time: 9.67s, Peak memory: 247.59MB, Memory allocations: 10133, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (14.36ms/3.00ms/1.84s/122.45us)
-------------------------------LocalFlat10K-------------------------------
LocalPartition: Output: 204802048 rows (27.16GB, 176128 batches), Cpu time: 2m 20s, Wall time: 21m 34s, Blocked wall time: 33m 27s, Peak memory: 10.44MB, Memory allocations: 163840, CPU breakdown: B/I/O/F (217.83ms/2m 18s/1.86s/16.75ms)
Producer Total 0ns Max: sum:6511604000, count:1, min:6511604000, max:6511604000 Median: sum:15436246000, count:16, min:355929000, max:1589746000 Min: sum:8482011000, count:16, min:318334000, max:741137000
Consumer Total 0ns Max: sum:15329630000, count:16, min:268246000, max:1666110000 Median: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808 Min: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808
Wall ms: 6522 / 5792 / 5091
  • Loading branch information
yingsu00 committed Nov 27, 2024
1 parent 84c3a8e commit e8f8cbb
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 186 deletions.
70 changes: 66 additions & 4 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,67 @@

namespace facebook::velox::exec {

PlanNodeStats& PlanNodeStats::operator+=(const PlanNodeStats& another) {
inputRows += another.inputRows;
inputBytes += another.inputBytes;
inputVectors += another.inputVectors;

rawInputRows += another.inputRows;
rawInputBytes += another.rawInputBytes;

dynamicFilterStats.add(another.dynamicFilterStats);

outputRows += another.outputRows;
outputBytes += another.outputBytes;
outputVectors += another.outputVectors;

isBlockedTiming.add(another.isBlockedTiming);
addInputTiming.add(another.addInputTiming);
getOutputTiming.add(another.getOutputTiming);
finishTiming.add(another.finishTiming);
cpuWallTiming.add(another.isBlockedTiming);
cpuWallTiming.add(another.addInputTiming);
cpuWallTiming.add(another.getOutputTiming);
cpuWallTiming.add(another.finishTiming);
cpuWallTiming.add(another.isBlockedTiming);

backgroundTiming.add(another.backgroundTiming);

blockedWallNanos += another.blockedWallNanos;

peakMemoryBytes += another.peakMemoryBytes;
numMemoryAllocations += another.numMemoryAllocations;

physicalWrittenBytes += another.physicalWrittenBytes;

for (const auto& [name, customStats] : another.customStats) {
if (UNLIKELY(this->customStats.count(name) == 0)) {
this->customStats.insert(std::make_pair(name, customStats));
} else {
this->customStats.at(name).merge(customStats);
}
}

// Populating number of drivers for plan nodes with multiple operators is not
// useful. Each operator could have been executed in different pipelines with
// different number of drivers.
if (!isMultiOperatorTypeNode()) {
numDrivers += another.numDrivers;
} else {
numDrivers = 0;
}

numSplits += another.numSplits;

spilledInputBytes += another.spilledInputBytes;
spilledBytes += another.spilledBytes;
spilledRows += another.spilledRows;
spilledPartitions += another.spilledPartitions;
spilledFiles += another.spilledFiles;

return *this;
}

void PlanNodeStats::add(const OperatorStats& stats) {
auto it = operatorStats.find(stats.operatorType);
if (it != operatorStats.end()) {
Expand Down Expand Up @@ -63,11 +124,11 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {

physicalWrittenBytes += stats.physicalWrittenBytes;

for (const auto& [name, runtimeStats] : stats.runtimeStats) {
if (UNLIKELY(customStats.count(name) == 0)) {
customStats.insert(std::make_pair(name, runtimeStats));
for (const auto& [name, customStats] : stats.runtimeStats) {
if (UNLIKELY(this->customStats.count(name) == 0)) {
this->customStats.insert(std::make_pair(name, customStats));
} else {
customStats.at(name).merge(runtimeStats);
this->customStats.at(name).merge(customStats);
}
}

Expand Down Expand Up @@ -105,6 +166,7 @@ std::string PlanNodeStats::toString(bool includeInputStats) const {
out << ", Physical written output: " << succinctBytes(physicalWrittenBytes);
}
out << ", Cpu time: " << succinctNanos(cpuWallTiming.cpuNanos)
<< ", Wall time: " << succinctNanos(cpuWallTiming.wallNanos)
<< ", Blocked wall time: " << succinctNanos(blockedWallNanos)
<< ", Peak memory: " << succinctBytes(peakMemoryBytes)
<< ", Memory allocations: " << numMemoryAllocations;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct PlanNodeStats {
PlanNodeStats(PlanNodeStats&&) = default;
PlanNodeStats& operator=(PlanNodeStats&&) = default;

PlanNodeStats& operator+=(const PlanNodeStats&);

/// Sum of input rows for all corresponding operators. Useful primarily for
/// leaf plan nodes or plan nodes that correspond to a single operator type.
uint64_t inputRows{0};
Expand Down
Loading

0 comments on commit e8f8cbb

Please sign in to comment.