diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java index 252e425d6246..551ff12b8062 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java @@ -23,15 +23,19 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; +import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder; +import io.trino.plugin.exchange.filesystem.MetricsBuilder.DistributionMetricBuilder; import io.trino.spi.TrinoException; import io.trino.spi.exchange.ExchangeSink; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; +import io.trino.spi.metrics.Metrics; import java.net.URI; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -80,6 +84,10 @@ public class FileSystemExchangeSink private final AtomicReference failure = new AtomicReference<>(); private volatile boolean closed; + private final MetricsBuilder metricsBuilder = new MetricsBuilder(); + private final CounterMetricBuilder totalFilesMetric = metricsBuilder.getCounterMetric("FileSystemExchangeSource.filesTotal"); + private final DistributionMetricBuilder fileSizeMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.fileSize"); + public FileSystemExchangeSink( FileSystemExchangeStorage exchangeStorage, FileSystemExchangeStats stats, @@ -154,7 +162,9 @@ private BufferedStorageWriter createWriter(int partitionId) bufferPool, failure, maxPageStorageSizeInBytes, - maxFileSizeInBytes); + maxFileSizeInBytes, + totalFilesMetric, + fileSizeMetric); } @Override @@ -215,6 +225,12 @@ public synchronized CompletableFuture abort() directExecutor()))); } + @Override + public Optional getMetrics() + { + return Optional.of(metricsBuilder.buildMetrics()); + } + private void throwIfFailed() { Throwable throwable = failure.get(); @@ -244,6 +260,8 @@ private static class BufferedStorageWriter private final AtomicReference failure; private final int maxPageStorageSizeInBytes; private final long maxFileSizeInBytes; + private final CounterMetricBuilder totalFilesMetric; + private final DistributionMetricBuilder fileSizeMetric; @GuardedBy("this") private ExchangeStorageWriter currentWriter; @@ -265,7 +283,9 @@ public BufferedStorageWriter( BufferPool bufferPool, AtomicReference failure, int maxPageStorageSizeInBytes, - long maxFileSizeInBytes) + long maxFileSizeInBytes, + CounterMetricBuilder totalFilesMetric, + DistributionMetricBuilder fileSizeMetric) { this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); this.stats = requireNonNull(stats, "stats is null"); @@ -276,6 +296,8 @@ public BufferedStorageWriter( this.failure = requireNonNull(failure, "failure is null"); this.maxPageStorageSizeInBytes = maxPageStorageSizeInBytes; this.maxFileSizeInBytes = maxFileSizeInBytes; + this.totalFilesMetric = requireNonNull(totalFilesMetric, "totalFilesMetric is null"); + this.fileSizeMetric = requireNonNull(fileSizeMetric, "fileSizeMetric is null"); setupWriterForNextPart(); } @@ -297,6 +319,8 @@ public synchronized void write(Slice data) setupWriterForNextPart(); currentFileSize = 0; currentBuffer = null; + fileSizeMetric.add(currentFileSize); + totalFilesMetric.increment(); } Slice sizeSlice = Slices.allocate(Integer.BYTES);