Skip to content

Commit

Permalink
Remove support for connector event listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jan 2, 2025
1 parent ac73163 commit ad43316
Show file tree
Hide file tree
Showing 9 changed files with 6 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.connector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.airlift.log.Logger;
Expand All @@ -37,7 +36,6 @@
import io.trino.spi.connector.SchemaRoutineName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.function.FunctionKind;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.ArgumentSpecification;
Expand Down Expand Up @@ -83,7 +81,6 @@ public class ConnectorServices
private final Optional<ConnectorIndexProvider> indexProvider;
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
private final Optional<ConnectorAccessControl> accessControl;
private final List<EventListener> eventListeners;
private final Map<String, PropertyMetadata<?>> sessionProperties;
private final Map<String, PropertyMetadata<?>> tableProperties;
private final Map<String, PropertyMetadata<?>> viewProperties;
Expand Down Expand Up @@ -184,10 +181,6 @@ public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector c
verifyAccessControl(accessControl);
this.accessControl = Optional.ofNullable(accessControl);

Iterable<EventListener> eventListeners = connector.getEventListeners();
requireNonNull(eventListeners, format("Connector '%s' returned a null event listeners iterable", eventListeners));
this.eventListeners = ImmutableList.copyOf(eventListeners);

List<PropertyMetadata<?>> sessionProperties = connector.getSessionProperties();
requireNonNull(sessionProperties, format("Connector '%s' returned a null system properties set", catalogHandle));
this.sessionProperties = Maps.uniqueIndex(sessionProperties, PropertyMetadata::getName);
Expand Down Expand Up @@ -297,11 +290,6 @@ public Optional<ConnectorAccessControl> getAccessControl()
return accessControl;
}

public List<EventListener> getEventListeners()
{
return eventListeners;
}

public Map<String, PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
Expand Down
27 changes: 0 additions & 27 deletions core/trino-main/src/main/java/io/trino/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
Expand Down Expand Up @@ -44,7 +43,6 @@
import io.trino.connector.CatalogManagerConfig.CatalogMangerKind;
import io.trino.connector.CatalogManagerModule;
import io.trino.connector.CatalogStoreManager;
import io.trino.connector.ConnectorServices;
import io.trino.connector.ConnectorServicesProvider;
import io.trino.eventlistener.EventListenerManager;
import io.trino.eventlistener.EventListenerModule;
Expand Down Expand Up @@ -72,7 +70,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
Expand All @@ -84,7 +81,6 @@
import static io.trino.server.TrinoSystemRequirements.verifySystemRequirements;
import static java.lang.String.format;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.joining;

public class Server
Expand Down Expand Up @@ -157,14 +153,8 @@ private void doStart(String trinoVersion)

// Only static catalog manager announces catalogs
// Connector event listeners are only supported for statically loaded catalogs
// TODO: remove connector event listeners or add support for dynamic loading from connector
if (injector.getInstance(CatalogManagerConfig.class).getCatalogMangerKind() == CatalogMangerKind.STATIC) {
CatalogManager catalogManager = injector.getInstance(CatalogManager.class);
addConnectorEventListeners(
catalogManager,
injector.getInstance(ConnectorServicesProvider.class),
injector.getInstance(EventListenerManager.class));

// TODO: remove this huge hack
updateConnectorIds(injector.getInstance(Announcer.class), catalogManager);
}
Expand Down Expand Up @@ -211,23 +201,6 @@ private void doStart(String trinoVersion)
}
}

@VisibleForTesting
public static void addConnectorEventListeners(
CatalogManager catalogManager,
ConnectorServicesProvider connectorServicesProvider,
EventListenerManager eventListenerManager)
{
catalogManager.getCatalogNames().stream()
.map(catalogManager::getCatalog)
.flatMap(Optional::stream)
.filter(not(Catalog::isFailed))
.map(Catalog::getCatalogHandle)
.map(connectorServicesProvider::getConnectorServices)
.map(ConnectorServices::getEventListeners)
.flatMap(Collection::stream)
.forEach(eventListenerManager::addEventListener);
}

private static void addMessages(StringBuilder output, String type, List<Object> messages)
{
if (messages.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import io.trino.server.PluginInstaller;
import io.trino.server.PrefixObjectNameGeneratorModule;
import io.trino.server.QuerySessionSupplier;
import io.trino.server.Server;
import io.trino.server.ServerMainModule;
import io.trino.server.SessionContext;
import io.trino.server.SessionPropertyDefaults;
Expand Down Expand Up @@ -529,20 +528,6 @@ public void loadSpoolingManager(String name, Map<String, String> properties)
spoolingManagerRegistry.loadSpoolingManager(name, properties);
}

/**
* Add the event listeners from connectors. Connector event listeners are
* only supported for statically loaded catalogs, and this doesn't match up
* with the model of the testing Trino server. This method should only be
* called once after all catalogs are added.
*/
public void addConnectorEventListeners()
{
Server.addConnectorEventListeners(
injector.getInstance(CatalogManager.class),
injector.getInstance(ConnectorServicesProvider.class),
injector.getInstance(EventListenerManager.class));
}

public Path getBaseDataDir()
{
return baseDataDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.function.BoundSignature;
import io.trino.spi.function.FunctionDependencyDeclaration;
Expand Down Expand Up @@ -174,7 +173,6 @@ public class MockConnector
private final BiFunction<ConnectorSession, Type, Optional<Type>> getSupportedType;
private final BiFunction<ConnectorSession, ConnectorTableHandle, ConnectorTableProperties> getTableProperties;
private final BiFunction<ConnectorSession, SchemaTablePrefix, List<GrantInfo>> listTablePrivileges;
private final Supplier<Iterable<EventListener>> eventListeners;
private final Collection<FunctionMetadata> functions;
private final MockConnectorFactory.ListRoleGrants roleGrants;
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
Expand Down Expand Up @@ -229,7 +227,6 @@ public class MockConnector
BiFunction<ConnectorSession, Type, Optional<Type>> getSupportedType,
BiFunction<ConnectorSession, ConnectorTableHandle, ConnectorTableProperties> getTableProperties,
BiFunction<ConnectorSession, SchemaTablePrefix, List<GrantInfo>> listTablePrivileges,
Supplier<Iterable<EventListener>> eventListeners,
Collection<FunctionMetadata> functions,
ListRoleGrants roleGrants,
Optional<ConnectorNodePartitioningProvider> partitioningProvider,
Expand Down Expand Up @@ -282,7 +279,6 @@ public class MockConnector
this.getSupportedType = requireNonNull(getSupportedType, "getSupportedType is null");
this.getTableProperties = requireNonNull(getTableProperties, "getTableProperties is null");
this.listTablePrivileges = requireNonNull(listTablePrivileges, "listTablePrivileges is null");
this.eventListeners = requireNonNull(eventListeners, "eventListeners is null");
this.functions = ImmutableList.copyOf(functions);
this.roleGrants = requireNonNull(roleGrants, "roleGrants is null");
this.partitioningProvider = requireNonNull(partitioningProvider, "partitioningProvider is null");
Expand Down Expand Up @@ -367,12 +363,6 @@ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
return partitioningProvider.orElseThrow(UnsupportedOperationException::new);
}

@Override
public Iterable<EventListener> getEventListeners()
{
return eventListeners.get();
}

@Override
public ConnectorAccessControl getAccessControl()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.FunctionProvider;
Expand Down Expand Up @@ -125,7 +124,6 @@ public class MockConnectorFactory
private final BiFunction<ConnectorSession, Type, Optional<Type>> getSupportedType;
private final BiFunction<ConnectorSession, ConnectorTableHandle, ConnectorTableProperties> getTableProperties;
private final BiFunction<ConnectorSession, SchemaTablePrefix, List<GrantInfo>> listTablePrivileges;
private final Supplier<Iterable<EventListener>> eventListeners;
private final Collection<FunctionMetadata> functions;
private final Function<SchemaTableName, List<List<?>>> data;
private final Function<SchemaTableName, Metrics> metrics;
Expand Down Expand Up @@ -183,7 +181,6 @@ private MockConnectorFactory(
BiFunction<ConnectorSession, Type, Optional<Type>> getSupportedType,
BiFunction<ConnectorSession, ConnectorTableHandle, ConnectorTableProperties> getTableProperties,
BiFunction<ConnectorSession, SchemaTablePrefix, List<GrantInfo>> listTablePrivileges,
Supplier<Iterable<EventListener>> eventListeners,
Collection<FunctionMetadata> functions,
Function<SchemaTableName, List<List<?>>> data,
Function<SchemaTableName, Metrics> metrics,
Expand Down Expand Up @@ -237,7 +234,6 @@ private MockConnectorFactory(
this.getSupportedType = requireNonNull(getSupportedType, "getSupportedType is null");
this.getTableProperties = requireNonNull(getTableProperties, "getTableProperties is null");
this.listTablePrivileges = requireNonNull(listTablePrivileges, "listTablePrivileges is null");
this.eventListeners = requireNonNull(eventListeners, "eventListeners is null");
this.functions = ImmutableList.copyOf(functions);
this.analyzeProperties = requireNonNull(analyzeProperties, "analyzeProperties is null");
this.schemaProperties = requireNonNull(schemaProperties, "schemaProperties is null");
Expand Down Expand Up @@ -301,7 +297,6 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
getSupportedType,
getTableProperties,
listTablePrivileges,
eventListeners,
functions,
roleGrants,
partitioningProvider,
Expand Down Expand Up @@ -446,7 +441,6 @@ public static final class Builder
private BiFunction<ConnectorSession, Type, Optional<Type>> getSupportedType = (session, type) -> Optional.empty();
private BiFunction<ConnectorSession, ConnectorTableHandle, ConnectorTableProperties> getTableProperties = defaultGetTableProperties();
private BiFunction<ConnectorSession, SchemaTablePrefix, List<GrantInfo>> listTablePrivileges = defaultListTablePrivileges();
private Supplier<Iterable<EventListener>> eventListeners = ImmutableList::of;
private Collection<FunctionMetadata> functions = ImmutableList.of();
private ApplyTopN applyTopN = (session, handle, topNCount, sortItems, assignments) -> Optional.empty();
private ApplyFilter applyFilter = (session, handle, constraint) -> Optional.empty();
Expand Down Expand Up @@ -683,22 +677,6 @@ public Builder withListTablePrivileges(BiFunction<ConnectorSession, SchemaTableP
return this;
}

public Builder withEventListener(EventListener listener)
{
requireNonNull(listener, "listener is null");

withEventListener(() -> listener);
return this;
}

public Builder withEventListener(Supplier<EventListener> listenerFactory)
{
requireNonNull(listenerFactory, "listenerFactory is null");

this.eventListeners = () -> ImmutableList.of(listenerFactory.get());
return this;
}

public Builder withFunctions(Collection<FunctionMetadata> functions)
{
requireNonNull(functions, "functions is null");
Expand Down Expand Up @@ -882,7 +860,6 @@ public MockConnectorFactory build()
getSupportedType,
getTableProperties,
listTablePrivileges,
eventListeners,
functions,
data,
metrics,
Expand Down

This file was deleted.

6 changes: 6 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
</item>
<!-- Backwards incompatible changes since the previous release -->
<!-- Any exclusions below can be deleted after each release -->
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method java.lang.Iterable&lt;io.trino.spi.eventlistener.EventListener&gt; io.trino.spi.connector.Connector::getEventListeners()</old>
<justification>Remove connector event listeners</justification>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.spi.connector;

import io.trino.spi.Experimental;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.procedure.Procedure;
Expand Down Expand Up @@ -231,14 +230,6 @@ default ConnectorAccessControl getAccessControl()
throw new UnsupportedOperationException();
}

/**
* @return the event listeners provided by this connector
*/
default Iterable<EventListener> getEventListeners()
{
return emptySet();
}

/**
* Commit the transaction. Will be called at most once and will not be called if
* {@link #rollback} is called.
Expand Down
Loading

0 comments on commit ad43316

Please sign in to comment.