Skip to content

Commit

Permalink
Prevent too many concurrent query completed notifications
Browse files Browse the repository at this point in the history
queryCompleted notifications are handled by unbounded
executor, hence it could cause all sort of issues when
there is spike in query traffic and event listener
is slow to report events.
  • Loading branch information
sopel39 committed Aug 5, 2024
1 parent 8f4dc1c commit e700260
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.validation.FileExists;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.io.File;
Expand All @@ -26,6 +27,7 @@
public class EventListenerConfig
{
private List<File> eventListenerFiles = ImmutableList.of();
private int maxConcurrentQueryCompletedEvents = 100;

@NotNull
public List<@FileExists File> getEventListenerFiles()
Expand All @@ -41,4 +43,17 @@ public EventListenerConfig setEventListenerFiles(List<String> eventListenerFiles
.collect(toImmutableList());
return this;
}

@Min(1)
public int getMaxConcurrentQueryCompletedEvents()
{
return maxConcurrentQueryCompletedEvents;
}

@Config("event-listener.max-concurrent-query-completed-events")
public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents)
{
this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.stats.TimeStat;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.eventlistener.EventListener;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -55,10 +57,13 @@ public class EventListenerManager
private static final File CONFIG_FILE = new File("etc/event-listener.properties");
private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name";
private final List<File> configFiles;
private final int maxConcurrentQueryCompletedEvents;
private final Map<String, EventListenerFactory> eventListenerFactories = new ConcurrentHashMap<>();
private final List<EventListener> providedEventListeners = Collections.synchronizedList(new ArrayList<>());
private final AtomicReference<List<EventListener>> configuredEventListeners = new AtomicReference<>(ImmutableList.of());
private final AtomicBoolean loading = new AtomicBoolean(false);
private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger();
private final CounterStat skippedQueryCompletedEvents = new CounterStat();

private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS);
private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS);
Expand All @@ -68,6 +73,7 @@ public class EventListenerManager
public EventListenerManager(EventListenerConfig config)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents();
}

public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
Expand Down Expand Up @@ -144,7 +150,13 @@ private static Map<String, String> loadEventListenerProperties(File configFile)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
try (TimeStat.BlockTimer _ = queryCompletedTime.time()) {
if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) {
concurrentQueryCompletedEvents.decrementAndGet();
skippedQueryCompletedEvents.update(1);
return;
}
doQueryCompleted(queryCompletedEventProvider);
concurrentQueryCompletedEvents.decrementAndGet();
}
}

Expand Down Expand Up @@ -220,6 +232,19 @@ public TimeStat getSplitCompletedTime()
return splitCompletedTime;
}

@Managed
public int getConcurrentQueryCompletedEvents()
{
return concurrentQueryCompletedEvents.get();
}

@Managed
@Nested
public CounterStat getSkippedQueryCompletedEvents()
{
return skippedQueryCompletedEvents;
}

@PreDestroy
public void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TestEventListenerConfig
public void testDefaults()
{
assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class)
.setMaxConcurrentQueryCompletedEvents(100)
.setEventListenerFiles(ImmutableList.of()));
}

Expand All @@ -42,10 +43,13 @@ public void testExplicitPropertyMappings()
Path config1 = Files.createTempFile(null, null);
Path config2 = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.of("event-listener.config-files", config1.toString() + "," + config2.toString());
Map<String, String> properties = ImmutableMap.of(
"event-listener.config-files", config1.toString() + "," + config2.toString(),
"event-listener.max-concurrent-query-completed-events", "1");

EventListenerConfig expected = new EventListenerConfig()
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()));
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()))
.setMaxConcurrentQueryCompletedEvents(1);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,139 @@
package io.trino.eventlistener;

import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
import io.trino.spi.eventlistener.QueryIOMetadata;
import io.trino.spi.eventlistener.QueryMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.session.ResourceEstimates;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;

class TestEventListenerManager
{
private static final QueryMetadata QUERY_METADATA = new QueryMetadata(
"minimal_query",
Optional.empty(),
"query",
Optional.empty(),
Optional.empty(),
"queryState",
// not stored
List.of(),
// not stored
List.of(),
URI.create("http://localhost"),
Optional.empty(),
Optional.empty(),
Optional.empty());

private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics(
ofMillis(101),
ofMillis(102),
ofMillis(103),
ofMillis(104),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
115L,
116L,
117L,
118L,
119L,
1191L,
1192L,
120L,
121L,
122L,
123L,
124L,
125L,
126L,
127L,
1271L,
128.0,
129.0,
// not stored
Collections.emptyList(),
130,
false,
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
// not stored
Optional.empty());

private static final QueryContext QUERY_CONTEXT = new QueryContext(
"user",
"originalUser",
Optional.empty(),
Set.of(),
Set.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Set.of(),
// not stored
Set.of(),
Optional.empty(),
UTC_KEY.getId(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Map.of(),
// not stored
new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()),
"serverAddress",
"serverVersion",
"environment",
Optional.empty(),
"NONE");

private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty());

private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent(
QUERY_METADATA,
QUERY_STATISTICS,
QUERY_CONTEXT,
QUERY_IO_METADATA,
Optional.empty(),
List.of(),
Instant.now(),
Instant.now(),
Instant.now());

@Test
public void testShutdownIsForwardedToListeners()
{
Expand All @@ -42,4 +167,47 @@ public void shutdown()

assertThat(wasCalled.get()).isTrue();
}

@Test
public void testMaxConcurrentQueryCompletedEvents()
throws InterruptedException
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1));
eventListenerManager.addEventListener(new BlockingEventListener());
eventListenerManager.loadEventListeners();
ExecutorService executor = newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
Runnable queryCompletedEvent = () -> {
eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT);
countDownLatch.countDown();
};
executor.submit(queryCompletedEvent);
executor.submit(queryCompletedEvent);

countDownLatch.await();
assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1);
assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1);
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
}
}

private static final class BlockingEventListener
implements EventListener
{
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
try {
// sleep forever
Thread.sleep(100_000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/develop/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ custom-property1=custom-value1
custom-property2=custom-value2
```

Maximum number of concurrent query completed events
can be configured using `event-listener.max-concurrent-query-completed-events` property
(`100` by default). Excessive events are dropped.

(multiple-listeners)=
## Multiple event listeners

Expand Down

0 comments on commit e700260

Please sign in to comment.