From e700260d33c1d210cb3c258f51ea281283db9927 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Thu, 1 Aug 2024 15:44:42 +0200 Subject: [PATCH] Prevent too many concurrent query completed notifications queryCompleted notifications are handled by unbounded executor, hence it could cause all sort of issues when there is spike in query traffic and event listener is slow to report events. --- .../eventlistener/EventListenerConfig.java | 15 ++ .../eventlistener/EventListenerManager.java | 25 +++ .../TestEventListenerConfig.java | 8 +- .../TestEventListenerManager.java | 168 ++++++++++++++++++ .../src/main/sphinx/develop/event-listener.md | 4 + 5 files changed, 218 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java index 5f9ef30ef294..374ea9f45d1a 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.validation.FileExists; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import java.io.File; @@ -26,6 +27,7 @@ public class EventListenerConfig { private List eventListenerFiles = ImmutableList.of(); + private int maxConcurrentQueryCompletedEvents = 100; @NotNull public List<@FileExists File> getEventListenerFiles() @@ -41,4 +43,17 @@ public EventListenerConfig setEventListenerFiles(List eventListenerFiles .collect(toImmutableList()); return this; } + + @Min(1) + public int getMaxConcurrentQueryCompletedEvents() + { + return maxConcurrentQueryCompletedEvents; + } + + @Config("event-listener.max-concurrent-query-completed-events") + public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents) + { + this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents; + return this; + } } diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java index a8bc8c551ea2..88ec185b2663 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.airlift.log.Logger; +import io.airlift.stats.CounterStat; import io.airlift.stats.TimeStat; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.eventlistener.EventListener; @@ -38,6 +39,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; @@ -55,10 +57,13 @@ public class EventListenerManager private static final File CONFIG_FILE = new File("etc/event-listener.properties"); private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name"; private final List configFiles; + private final int maxConcurrentQueryCompletedEvents; private final Map eventListenerFactories = new ConcurrentHashMap<>(); private final List providedEventListeners = Collections.synchronizedList(new ArrayList<>()); private final AtomicReference> configuredEventListeners = new AtomicReference<>(ImmutableList.of()); private final AtomicBoolean loading = new AtomicBoolean(false); + private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger(); + private final CounterStat skippedQueryCompletedEvents = new CounterStat(); private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS); private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS); @@ -68,6 +73,7 @@ public class EventListenerManager public EventListenerManager(EventListenerConfig config) { this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles()); + this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents(); } public void addEventListenerFactory(EventListenerFactory eventListenerFactory) @@ -144,7 +150,13 @@ private static Map loadEventListenerProperties(File configFile) public void queryCompleted(Function queryCompletedEventProvider) { try (TimeStat.BlockTimer _ = queryCompletedTime.time()) { + if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) { + concurrentQueryCompletedEvents.decrementAndGet(); + skippedQueryCompletedEvents.update(1); + return; + } doQueryCompleted(queryCompletedEventProvider); + concurrentQueryCompletedEvents.decrementAndGet(); } } @@ -220,6 +232,19 @@ public TimeStat getSplitCompletedTime() return splitCompletedTime; } + @Managed + public int getConcurrentQueryCompletedEvents() + { + return concurrentQueryCompletedEvents.get(); + } + + @Managed + @Nested + public CounterStat getSkippedQueryCompletedEvents() + { + return skippedQueryCompletedEvents; + } + @PreDestroy public void shutdown() { diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java index 20ccadc9df1a..32180cdcf0c3 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java @@ -32,6 +32,7 @@ public class TestEventListenerConfig public void testDefaults() { assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class) + .setMaxConcurrentQueryCompletedEvents(100) .setEventListenerFiles(ImmutableList.of())); } @@ -42,10 +43,13 @@ public void testExplicitPropertyMappings() Path config1 = Files.createTempFile(null, null); Path config2 = Files.createTempFile(null, null); - Map properties = ImmutableMap.of("event-listener.config-files", config1.toString() + "," + config2.toString()); + Map properties = ImmutableMap.of( + "event-listener.config-files", config1.toString() + "," + config2.toString(), + "event-listener.max-concurrent-query-completed-events", "1"); EventListenerConfig expected = new EventListenerConfig() - .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())); + .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())) + .setMaxConcurrentQueryCompletedEvents(1); assertFullMapping(properties, expected); } diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java index d31353539930..f3fbcd1f0006 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java @@ -14,14 +14,139 @@ package io.trino.eventlistener; import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; +import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import io.trino.spi.session.ResourceEstimates; import org.junit.jupiter.api.Test; +import java.net.URI; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static java.time.Duration.ofMillis; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.assertj.core.api.Assertions.assertThat; class TestEventListenerManager { + private static final QueryMetadata QUERY_METADATA = new QueryMetadata( + "minimal_query", + Optional.empty(), + "query", + Optional.empty(), + Optional.empty(), + "queryState", + // not stored + List.of(), + // not stored + List.of(), + URI.create("http://localhost"), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics( + ofMillis(101), + ofMillis(102), + ofMillis(103), + ofMillis(104), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + 115L, + 116L, + 117L, + 118L, + 119L, + 1191L, + 1192L, + 120L, + 121L, + 122L, + 123L, + 124L, + 125L, + 126L, + 127L, + 1271L, + 128.0, + 129.0, + // not stored + Collections.emptyList(), + 130, + false, + // not stored + Collections.emptyList(), + // not stored + Collections.emptyList(), + // not stored + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + // not stored + Optional.empty()); + + private static final QueryContext QUERY_CONTEXT = new QueryContext( + "user", + "originalUser", + Optional.empty(), + Set.of(), + Set.of(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Set.of(), + // not stored + Set.of(), + Optional.empty(), + UTC_KEY.getId(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Map.of(), + // not stored + new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()), + "serverAddress", + "serverVersion", + "environment", + Optional.empty(), + "NONE"); + + private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty()); + + private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent( + QUERY_METADATA, + QUERY_STATISTICS, + QUERY_CONTEXT, + QUERY_IO_METADATA, + Optional.empty(), + List.of(), + Instant.now(), + Instant.now(), + Instant.now()); + @Test public void testShutdownIsForwardedToListeners() { @@ -42,4 +167,47 @@ public void shutdown() assertThat(wasCalled.get()).isTrue(); } + + @Test + public void testMaxConcurrentQueryCompletedEvents() + throws InterruptedException + { + EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1)); + eventListenerManager.addEventListener(new BlockingEventListener()); + eventListenerManager.loadEventListeners(); + ExecutorService executor = newFixedThreadPool(2); + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + Runnable queryCompletedEvent = () -> { + eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT); + countDownLatch.countDown(); + }; + executor.submit(queryCompletedEvent); + executor.submit(queryCompletedEvent); + + countDownLatch.await(); + assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1); + assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + } + } + + private static final class BlockingEventListener + implements EventListener + { + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + try { + // sleep forever + Thread.sleep(100_000); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } } diff --git a/docs/src/main/sphinx/develop/event-listener.md b/docs/src/main/sphinx/develop/event-listener.md index 3c81931b34de..8d365b605704 100644 --- a/docs/src/main/sphinx/develop/event-listener.md +++ b/docs/src/main/sphinx/develop/event-listener.md @@ -45,6 +45,10 @@ custom-property1=custom-value1 custom-property2=custom-value2 ``` +Maximum number of concurrent query completed events +can be configured using `event-listener.max-concurrent-query-completed-events` property +(`100` by default). Excessive events are dropped. + (multiple-listeners)= ## Multiple event listeners