Skip to content

Commit

Permalink
Expose filesystem exchange sink stats
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Dec 12, 2024
1 parent 98d7c13 commit bfdffa3
Showing 1 changed file with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.DistributionMetricBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.metrics.Metrics;

import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -80,6 +84,10 @@ public class FileSystemExchangeSink
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private volatile boolean closed;

private final MetricsBuilder metricsBuilder = new MetricsBuilder();
private final CounterMetricBuilder totalFilesMetric = metricsBuilder.getCounterMetric("FileSystemExchangeSource.filesTotal");
private final DistributionMetricBuilder fileSizeMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.fileSize");

public FileSystemExchangeSink(
FileSystemExchangeStorage exchangeStorage,
FileSystemExchangeStats stats,
Expand Down Expand Up @@ -154,7 +162,9 @@ private BufferedStorageWriter createWriter(int partitionId)
bufferPool,
failure,
maxPageStorageSizeInBytes,
maxFileSizeInBytes);
maxFileSizeInBytes,
totalFilesMetric,
fileSizeMetric);
}

@Override
Expand Down Expand Up @@ -215,6 +225,12 @@ public synchronized CompletableFuture<Void> abort()
directExecutor())));
}

@Override
public Optional<Metrics> getMetrics()
{
return Optional.of(metricsBuilder.buildMetrics());
}

private void throwIfFailed()
{
Throwable throwable = failure.get();
Expand Down Expand Up @@ -244,6 +260,8 @@ private static class BufferedStorageWriter
private final AtomicReference<Throwable> failure;
private final int maxPageStorageSizeInBytes;
private final long maxFileSizeInBytes;
private final CounterMetricBuilder totalFilesMetric;
private final DistributionMetricBuilder fileSizeMetric;

@GuardedBy("this")
private ExchangeStorageWriter currentWriter;
Expand All @@ -265,7 +283,9 @@ public BufferedStorageWriter(
BufferPool bufferPool,
AtomicReference<Throwable> failure,
int maxPageStorageSizeInBytes,
long maxFileSizeInBytes)
long maxFileSizeInBytes,
CounterMetricBuilder totalFilesMetric,
DistributionMetricBuilder fileSizeMetric)
{
this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null");
this.stats = requireNonNull(stats, "stats is null");
Expand All @@ -276,6 +296,8 @@ public BufferedStorageWriter(
this.failure = requireNonNull(failure, "failure is null");
this.maxPageStorageSizeInBytes = maxPageStorageSizeInBytes;
this.maxFileSizeInBytes = maxFileSizeInBytes;
this.totalFilesMetric = requireNonNull(totalFilesMetric, "totalFilesMetric is null");
this.fileSizeMetric = requireNonNull(fileSizeMetric, "fileSizeMetric is null");

setupWriterForNextPart();
}
Expand All @@ -297,6 +319,8 @@ public synchronized void write(Slice data)
setupWriterForNextPart();
currentFileSize = 0;
currentBuffer = null;
fileSizeMetric.add(currentFileSize);
totalFilesMetric.increment();
}

Slice sizeSlice = Slices.allocate(Integer.BYTES);
Expand Down

0 comments on commit bfdffa3

Please sign in to comment.