Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose filesystem exchange sink stats #24416

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.trino.spi.eventlistener.QueryOutputMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.eventlistener.StageCpuDistribution;
import io.trino.spi.eventlistener.StageOutputBufferMetrics;
import io.trino.spi.eventlistener.StageOutputBufferUtilization;
import io.trino.spi.eventlistener.StageTaskStatistics;
import io.trino.spi.metrics.Metrics;
Expand Down Expand Up @@ -238,6 +239,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
Optional.empty()),
createQueryContext(
queryInfo.getSession(),
Expand Down Expand Up @@ -351,6 +353,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryInfo.isFinalQueryInfo(),
getCpuDistributions(queryInfo),
getStageOutputBufferUtilizations(queryInfo),
getStageOutputBufferMetrics(queryInfo),
getStageTaskStatistics(queryInfo),
memoize(() -> operatorStats.stream().map(operatorStatsCodec::toJson).toList()),
ImmutableList.copyOf(queryInfo.getQueryStats().getOptimizerRulesSummaries()),
Expand Down Expand Up @@ -780,6 +783,29 @@ private static void populateStageOutputBufferUtilization(StageInfo stageInfo, Im
}
}

private static List<StageOutputBufferMetrics> getStageOutputBufferMetrics(QueryInfo queryInfo)
{
if (queryInfo.getOutputStage().isEmpty()) {
return ImmutableList.of();
}
ImmutableList.Builder<StageOutputBufferMetrics> builder = ImmutableList.builder();
populateStageOutputBufferMetrics(queryInfo.getOutputStage().get(), builder);

return builder.build();
}

private static void populateStageOutputBufferMetrics(StageInfo stageInfo, ImmutableList.Builder<StageOutputBufferMetrics> accumulator)
{
Metrics metrics = stageInfo.getStageStats().getOutputBufferMetrics();
if (!metrics.getMetrics().isEmpty()) {
accumulator.add(new StageOutputBufferMetrics(stageInfo.getStageId().getId(), metrics));
}

for (StageInfo subStage : stageInfo.getSubStages()) {
populateStageOutputBufferMetrics(subStage, accumulator);
}
}

private List<StageTaskStatistics> getStageTaskStatistics(QueryInfo queryInfo)
{
if (queryInfo.getOutputStage().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.operator.TaskStats;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.tracing.TrinoAttributes;
Expand Down Expand Up @@ -473,6 +474,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long failedOutputDataSize = 0;
long outputPositions = 0;
long failedOutputPositions = 0;
Metrics.Accumulator outputBufferMetrics = Metrics.accumulator();

long outputBlockedTime = 0;
long failedOutputBlockedTime = 0;
Expand Down Expand Up @@ -545,9 +547,13 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS);

bufferedDataSize += taskInfo.outputBuffers().getTotalBufferedBytes();

Optional<Metrics> bufferMetrics = taskInfo.outputBuffers().getMetrics();

taskInfo.outputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add);
outputDataSize += taskStats.getOutputDataSize().toBytes();
outputPositions += taskStats.getOutputPositions();
bufferMetrics.ifPresent(outputBufferMetrics::add);

outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS);

Expand Down Expand Up @@ -655,6 +661,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctBytes(failedOutputDataSize),
outputPositions,
failedOutputPositions,
outputBufferMetrics.get(),
succinctDuration(outputBlockedTime, NANOSECONDS),
succinctDuration(failedOutputBlockedTime, NANOSECONDS),
succinctBytes(physicalWrittenDataSize),
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.operator.BlockedReason;
import io.trino.operator.OperatorStats;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import org.joda.time.DateTime;

import java.util.List;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class StageStats
private final DataSize failedOutputDataSize;
private final long outputPositions;
private final long failedOutputPositions;
private final Metrics outputBufferMetrics;

private final Duration outputBlockedTime;
private final Duration failedOutputBlockedTime;
Expand Down Expand Up @@ -181,6 +183,7 @@ public StageStats(
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
@JsonProperty("outputPositions") long outputPositions,
@JsonProperty("failedOutputPositions") long failedOutputPositions,
@JsonProperty("outputBufferMetrics") Metrics outputBufferMetrics,

@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
@JsonProperty("failedOutputBlockedTime") Duration failedOutputBlockedTime,
Expand Down Expand Up @@ -272,6 +275,7 @@ public StageStats(
this.outputPositions = outputPositions;
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
this.failedOutputPositions = failedOutputPositions;
this.outputBufferMetrics = requireNonNull(outputBufferMetrics, "outputBufferMetrics is null");

this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null");
this.failedOutputBlockedTime = requireNonNull(failedOutputBlockedTime, "failedOutputBlockedTime is null");
Expand Down Expand Up @@ -590,6 +594,12 @@ public long getFailedOutputPositions()
return failedOutputPositions;
}

@JsonProperty
public Metrics getOutputBufferMetrics()
{
return outputBufferMetrics;
}

@JsonProperty
public Duration getOutputBlockedTime()
{
Expand Down Expand Up @@ -726,6 +736,7 @@ public static StageStats createInitial()
zeroBytes,
0,
0,
Metrics.EMPTY,
zeroSeconds,
zeroSeconds,
zeroBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static TaskInfo createInitialTask(TaskId taskId, URI location, String nod
0,
pipelinedBufferStates,
Optional.empty(),
Optional.empty(),
Optional.empty()),
ImmutableSet.of(),
taskStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public OutputBufferInfo getInfo()
totalPagesAdded.get(),
Optional.of(infos.build()),
Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram())),
Optional.empty(),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public OutputBufferInfo getInfo()
.map(ClientBuffer::getInfo)
.collect(toImmutableList())),
Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram())),
Optional.empty(),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public OutputBufferInfo getInfo()
0,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
return outputBuffer.getInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.metrics.Metrics;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -38,6 +38,7 @@ public final class OutputBufferInfo
private final Optional<List<PipelinedBufferInfo>> pipelinedBufferStates;
private final Optional<TDigestHistogram> utilization;
private final Optional<SpoolingOutputStats.Snapshot> spoolingOutputStats;
private final Optional<Metrics> metrics;

@JsonCreator
public OutputBufferInfo(
Expand All @@ -51,7 +52,8 @@ public OutputBufferInfo(
@JsonProperty("totalPagesSent") long totalPagesSent,
@JsonProperty("pipelinedBufferStates") Optional<List<PipelinedBufferInfo>> pipelinedBufferStates,
@JsonProperty("utilization") Optional<TDigestHistogram> utilization,
@JsonProperty("spoolingOutputStats") Optional<SpoolingOutputStats.Snapshot> spoolingOutputStats)
@JsonProperty("spoolingOutputStats") Optional<SpoolingOutputStats.Snapshot> spoolingOutputStats,
@JsonProperty("metrics") Optional<Metrics> metrics)
{
this.type = type;
this.state = state;
Expand All @@ -64,6 +66,7 @@ public OutputBufferInfo(
this.pipelinedBufferStates = requireNonNull(pipelinedBufferStates, "pipelinedBufferStates is null").map(ImmutableList::copyOf);
this.utilization = utilization;
this.spoolingOutputStats = requireNonNull(spoolingOutputStats, "spoolingOutputStats is null");
this.metrics = requireNonNull(metrics, "metrics is null");
}

@JsonProperty
Expand Down Expand Up @@ -132,6 +135,12 @@ public Optional<SpoolingOutputStats.Snapshot> getSpoolingOutputStats()
return spoolingOutputStats;
}

@JsonProperty
public Optional<Metrics> getMetrics()
{
return metrics;
}

public OutputBufferInfo summarize()
{
return new OutputBufferInfo(
Expand All @@ -145,6 +154,7 @@ public OutputBufferInfo summarize()
totalPagesSent,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}

Expand All @@ -161,7 +171,8 @@ public OutputBufferInfo summarizeFinal()
totalPagesSent,
Optional.empty(),
utilization,
spoolingOutputStats);
spoolingOutputStats,
metrics);
}

public OutputBufferInfo pruneSpoolingOutputStats()
Expand All @@ -177,35 +188,8 @@ public OutputBufferInfo pruneSpoolingOutputStats()
totalPagesSent,
pipelinedBufferStates,
utilization,
Optional.empty());
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OutputBufferInfo that = (OutputBufferInfo) o;
return Objects.equals(type, that.type) &&
canAddBuffers == that.canAddBuffers &&
canAddPages == that.canAddPages &&
totalBufferedBytes == that.totalBufferedBytes &&
totalBufferedPages == that.totalBufferedPages &&
totalRowsSent == that.totalRowsSent &&
totalPagesSent == that.totalPagesSent &&
state == that.state &&
Objects.equals(pipelinedBufferStates, that.pipelinedBufferStates) &&
Objects.equals(utilization, that.utilization);
}

@Override
public int hashCode()
{
return Objects.hash(state, canAddBuffers, canAddPages, totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, pipelinedBufferStates, utilization);
Optional.empty(),
metrics);
}

@Override
Expand All @@ -222,6 +206,7 @@ public String toString()
.add("totalPagesSent", totalPagesSent)
.add("pipelinedBufferStates", pipelinedBufferStates)
.add("bufferUtilization", utilization)
.add("metrics", metrics)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public OutputBufferInfo getInfo()
totalPagesAdded.get(),
Optional.of(infos.build()),
Optional.of(new TDigestHistogram(memoryManager.getUtilizationHistogram())),
Optional.empty(),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.execution.StateMachine;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.metrics.Metrics;

import java.util.List;
import java.util.Optional;
Expand All @@ -47,6 +48,7 @@ public class SpoolingExchangeOutputBuffer
// It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads.
// However since the abort can be triggered at any moment of time this variable has to be accessed in a safe way (avoiding "check-then-use").
private ExchangeSink exchangeSink;
private Optional<Metrics> finalSinkMetrics;
private final Supplier<LocalMemoryContext> memoryContextSupplier;

private final AtomicLong peakMemoryUsage = new AtomicLong();
Expand Down Expand Up @@ -88,7 +90,8 @@ public OutputBufferInfo getInfo()
totalPagesAdded.get(),
Optional.empty(),
Optional.empty(),
outputStats.getFinalSnapshot());
outputStats.getFinalSnapshot(),
getMetrics());
}

@Override
Expand Down Expand Up @@ -226,7 +229,9 @@ public void setNoMorePages()
else {
stateMachine.finish();
}
finalSinkMetrics = sink.getMetrics();
exchangeSink = null;

forceFreeMemory();
});
}
Expand Down Expand Up @@ -257,6 +262,7 @@ public void abort()
if (failure != null) {
log.warn(failure, "Error aborting exchange sink");
}
finalSinkMetrics = sink.getMetrics();
exchangeSink = null;
forceFreeMemory();
});
Expand Down Expand Up @@ -315,4 +321,13 @@ private LocalMemoryContext getSystemMemoryContextOrNull()
return null;
}
}

private Optional<Metrics> getMetrics()
{
ExchangeSink sink = exchangeSink;
if (sink == null) {
return finalSinkMetrics;
}
return sink.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.WarningCode;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.resourcegroups.QueryType;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.spi.security.SelectedRole;
Expand Down Expand Up @@ -317,6 +318,7 @@ private static StageStats createStageStats(int value)
succinctBytes(value),
value,
value,
Metrics.EMPTY,
Duration.succinctDuration(value, NANOSECONDS),
Duration.succinctDuration(value, NANOSECONDS),
succinctBytes(value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.units.Duration;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import org.joda.time.DateTime;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -96,6 +97,7 @@ public class TestStageStats
DataSize.ofBytes(36),
37,
38,
Metrics.EMPTY,

new Duration(203, NANOSECONDS),
new Duration(204, NANOSECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public TaskInfo getTaskInfo()
0,
Optional.empty(),
Optional.of(new TDigestHistogram(new TDigest())),
Optional.empty(),
Optional.empty()),
ImmutableSet.copyOf(noMoreSplits),
new TaskStats(DateTime.now(), null),
Expand Down
Loading
Loading