Skip to content

Commit

Permalink
Support managing views in the Faker connector
Browse files Browse the repository at this point in the history
  • Loading branch information
nineinchnick committed Dec 25, 2024
1 parent 1bccf74 commit 7446fad
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 16 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-faker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.faker;

import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;

import java.util.Optional;

Expand All @@ -35,6 +36,11 @@ public String name()
return metadata.getName();
}

public Type type()
{
return metadata.getType();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.trino.plugin.faker;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
Expand All @@ -28,15 +29,16 @@
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.function.BoundSignature;
import io.trino.spi.function.FunctionDependencyDeclaration;
import io.trino.spi.function.FunctionId;
Expand All @@ -59,14 +61,20 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.filterKeys;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.INVALID_COLUMN_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_COLUMN_REFERENCE;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_FOUND;
Expand All @@ -85,11 +93,12 @@ public class FakerMetadata
private final List<SchemaInfo> schemas = new ArrayList<>();
private final double nullProbability;
private final long defaultLimit;
private final FakerFunctionProvider functionsProvider;

@GuardedBy("this")
private final Map<SchemaTableName, TableInfo> tables = new HashMap<>();

private final FakerFunctionProvider functionsProvider;
@GuardedBy("this")
private final Map<SchemaTableName, ConnectorViewDefinition> views = new HashMap<>();

@Inject
public FakerMetadata(FakerConfig config, FakerFunctionProvider functionProvider)
Expand Down Expand Up @@ -168,7 +177,7 @@ public synchronized ConnectorTableMetadata getTableMetadata(
@Override
public synchronized List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
return tables.keySet().stream()
return Stream.concat(tables.keySet().stream(), views.keySet().stream())
.filter(table -> schemaName.map(table.getSchemaName()::contentEquals).orElse(true))
.collect(toImmutableList());
}
Expand Down Expand Up @@ -197,13 +206,33 @@ public synchronized ColumnMetadata getColumnMetadata(
}

@Override
public synchronized Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
public synchronized Iterator<RelationColumnsMetadata> streamRelationColumns(
ConnectorSession session,
Optional<String> schemaName,
UnaryOperator<Set<SchemaTableName>> relationFilter)
{
return tables.entrySet().stream()
Map<SchemaTableName, RelationColumnsMetadata> relationColumns = new HashMap<>();

SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new)
.orElseGet(SchemaTablePrefix::new);
tables.entrySet().stream()
.filter(entry -> prefix.matches(entry.getKey()))
.map(entry -> TableColumnsMetadata.forTable(
entry.getKey(),
entry.getValue().columns().stream().map(ColumnInfo::metadata).collect(toImmutableList())))
.forEach(entry -> {
SchemaTableName name = entry.getKey();
RelationColumnsMetadata columns = RelationColumnsMetadata.forTable(
name,
entry.getValue().columns().stream()
.map(ColumnInfo::metadata)
.collect(toImmutableList()));
relationColumns.put(name, columns);
});

for (Map.Entry<SchemaTableName, ConnectorViewDefinition> entry : getViews(session, schemaName).entrySet()) {
relationColumns.put(entry.getKey(), RelationColumnsMetadata.forView(entry.getKey(), entry.getValue().getColumns()));
}

return relationFilter.apply(relationColumns.keySet()).stream()
.map(relationColumns::get)
.iterator();
}

Expand All @@ -219,12 +248,12 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan
{
checkSchemaExists(newTableName.getSchemaName());
checkTableNotExists(newTableName);
checkViewNotExists(newTableName);

FakerTableHandle handle = (FakerTableHandle) tableHandle;
SchemaTableName oldTableName = handle.schemaTableName();

tables.remove(oldTableName);
tables.put(newTableName, tables.get(oldTableName));
tables.put(newTableName, tables.remove(oldTableName));
}

@Override
Expand Down Expand Up @@ -293,6 +322,7 @@ public synchronized FakerOutputTableHandle beginCreateTable(ConnectorSession ses
SchemaTableName tableName = tableMetadata.getTable();
SchemaInfo schema = getSchema(tableName.getSchemaName());
checkTableNotExists(tableMetadata.getTable());
checkViewNotExists(tableMetadata.getTable());

double schemaNullProbability = (double) schema.properties().getOrDefault(SchemaInfo.NULL_PROBABILITY_PROPERTY, nullProbability);
double tableNullProbability = (double) tableMetadata.getProperties().getOrDefault(TableInfo.NULL_PROBABILITY_PROPERTY, schemaNullProbability);
Expand Down Expand Up @@ -349,7 +379,7 @@ private boolean isCharacterColumn(ColumnMetadata column)
private synchronized void checkSchemaExists(String schemaName)
{
if (schemas.stream().noneMatch(schema -> schema.name().equals(schemaName))) {
throw new SchemaNotFoundException(schemaName);
throw new TrinoException(SCHEMA_NOT_FOUND, format("Schema '%s' does not exist", schemaName));
}
}

Expand All @@ -360,8 +390,19 @@ private synchronized void checkTableNotExists(SchemaTableName tableName)
}
}

private synchronized void checkViewNotExists(SchemaTableName viewName)
{
if (views.containsKey(viewName)) {
throw new TrinoException(ALREADY_EXISTS, format("View '%s' already exists", viewName));
}
}

@Override
public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics)
{
requireNonNull(tableHandle, "tableHandle is null");
FakerOutputTableHandle fakerOutputHandle = (FakerOutputTableHandle) tableHandle;
Expand All @@ -371,12 +412,108 @@ public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(Connecto
TableInfo info = tables.get(tableName);
requireNonNull(info, "info is null");

// TODO ensure fragments is empty?

tables.put(tableName, new TableInfo(info.columns(), info.properties(), info.comment()));

return Optional.empty();
}

@Override
public synchronized List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
{
return views.keySet().stream()
.filter(table -> schemaName.map(table.getSchemaName()::equals).orElse(true))
.collect(toImmutableList());
}

@Override
public synchronized Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, Optional<String> schemaName)
{
SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new).orElseGet(SchemaTablePrefix::new);
return ImmutableMap.copyOf(filterKeys(views, prefix::matches));
}

@Override
public synchronized Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return Optional.ofNullable(views.get(viewName));
}

@Override
public synchronized void createView(
ConnectorSession session,
SchemaTableName viewName,
ConnectorViewDefinition definition,
Map<String, Object> viewProperties,
boolean replace)
{
checkArgument(viewProperties.isEmpty(), "This connector does not support creating views with properties");
checkSchemaExists(viewName.getSchemaName());
checkTableNotExists(viewName);

if (replace) {
views.put(viewName, definition);
}
else if (views.putIfAbsent(viewName, definition) != null) {
throw new TrinoException(ALREADY_EXISTS, "View '%s' already exists".formatted(viewName));
}
}

@Override
public synchronized void setViewComment(ConnectorSession session, SchemaTableName viewName, Optional<String> comment)
{
ConnectorViewDefinition view = getView(session, viewName).orElseThrow(() -> new ViewNotFoundException(viewName));
views.put(viewName, new ConnectorViewDefinition(
view.getOriginalSql(),
view.getCatalog(),
view.getSchema(),
view.getColumns(),
comment,
view.getOwner(),
view.isRunAsInvoker(),
view.getPath()));
}

@Override
public synchronized void setViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional<String> comment)
{
ConnectorViewDefinition view = getView(session, viewName).orElseThrow(() -> new ViewNotFoundException(viewName));
views.put(viewName, new ConnectorViewDefinition(
view.getOriginalSql(),
view.getCatalog(),
view.getSchema(),
view.getColumns().stream()
.map(currentViewColumn -> columnName.equals(currentViewColumn.getName()) ?
new ConnectorViewDefinition.ViewColumn(currentViewColumn.getName(), currentViewColumn.getType(), comment)
: currentViewColumn)
.collect(toImmutableList()),
view.getComment(),
view.getOwner(),
view.isRunAsInvoker(),
view.getPath()));
}

@Override
public synchronized void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
{
checkSchemaExists(target.getSchemaName());
if (!views.containsKey(source)) {
throw new TrinoException(NOT_FOUND, "View not found: " + source);
}
checkTableNotExists(target);

if (views.putIfAbsent(target, views.remove(source)) != null) {
throw new TrinoException(ALREADY_EXISTS, "View '%s' already exists".formatted(target));
}
}

@Override
public synchronized void dropView(ConnectorSession session, SchemaTableName viewName)
{
if (views.remove(viewName) == null) {
throw new ViewNotFoundException(viewName);
}
}

@Override
public synchronized Map<String, Object> getSchemaProperties(ConnectorSession session, String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.log.Level;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;

Expand Down Expand Up @@ -67,6 +68,9 @@ public DistributedQueryRunner build()
queryRunner.installPlugin(new FakerPlugin());
queryRunner.createCatalog(CATALOG, "faker", properties);

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of());

return queryRunner;
}
catch (Exception e) {
Expand Down
Loading

0 comments on commit 7446fad

Please sign in to comment.