Skip to content

Commit

Permalink
Add TrinoCatalog.listIcebergTables
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-stec authored and raunaqmorarka committed Dec 23, 2024
1 parent 8cd060d commit bc385e7
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ default Optional<String> getNamespaceSeparator()

List<TableInfo> listTables(ConnectorSession session, Optional<String> namespace);

List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace);

default List<SchemaTableName> listViews(ConnectorSession session, Optional<String> namespace)
{
return listTables(session, namespace).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,26 @@ public void renameNamespace(ConnectorSession session, String source, String targ

@Override
public List<TableInfo> listTables(ConnectorSession session, Optional<String> namespace)
{
return listTables(session, namespace, _ -> true);
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
return listTables(session, namespace, table -> isIcebergTable(getTableParameters(table))).stream()
.map(TableInfo::tableName)
.collect(toImmutableList());
}

private List<TableInfo> listTables(
ConnectorSession session,
Optional<String> namespace,
Predicate<com.amazonaws.services.glue.model.Table> tablePredicate)
{
List<Callable<List<TableInfo>>> tasks = listNamespaces(session, namespace).stream()
.map(glueNamespace -> (Callable<List<TableInfo>>) () -> getGlueTablesWithExceptionHandling(glueNamespace)
.filter(tablePredicate)
.map(table -> mapToTableInfo(glueNamespace, table))
.collect(toImmutableList()))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.log.Logger;
import io.trino.cache.EvictableCacheBuilder;
Expand Down Expand Up @@ -375,6 +376,28 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
}
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
List<Callable<List<SchemaTableName>>> tasks = listNamespaces(session, namespace).stream()
.map(schema -> (Callable<List<SchemaTableName>>) () -> metastore.getTableNamesWithParameters(schema, TABLE_TYPE_PROP, ImmutableSet.of(
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
ICEBERG_TABLE_TYPE_VALUE.toLowerCase(ENGLISH),
ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH))).stream()
.map(tableName -> new SchemaTableName(schema, tableName))
.collect(toImmutableList()))
.collect(toImmutableList());
try {
return processWithAdditionalThreads(tasks, metadataFetchingExecutor).stream()
.flatMap(Collection::stream)
.collect(toImmutableList());
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}

@Override
public Optional<Iterator<RelationColumnsMetadata>> streamRelationColumns(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iceberg.view.ViewVersion;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -200,6 +201,26 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
return ImmutableList.copyOf(tablesListBuilder.values());
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
List<String> namespaces = listNamespaces(session, namespace);

// Build as a set and convert to list for removing duplicate entries due to case difference
Set<SchemaTableName> tablesListBuilder = new HashSet<>();
for (String schemaName : namespaces) {
try {
listTableIdentifiers(schemaName, () -> jdbcCatalog.listTables(Namespace.of(schemaName))).stream()
.map(tableId -> SchemaTableName.schemaTableName(schemaName, tableId.name()))
.forEach(tablesListBuilder::add);
}
catch (NoSuchNamespaceException e) {
// Namespace may have been deleted
}
}
return ImmutableList.copyOf(tablesListBuilder);
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> namespace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
.collect(toImmutableList());
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
return listTables(session, namespace).stream()
.map(TableInfo::tableName)
.collect(toImmutableList());
}

@Override
public Optional<Iterator<RelationColumnsMetadata>> streamRelationColumns(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,21 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
return tables.build();
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
SessionContext sessionContext = convert(session);
List<Namespace> namespaces = listNamespaces(session, namespace);

ImmutableList.Builder<SchemaTableName> tables = ImmutableList.builder();
for (Namespace restNamespace : namespaces) {
listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, toRemoteNamespace(session, restNamespace))).stream()
.map(id -> SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name()))
.forEach(tables::add);
}
return tables.build();
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> namespace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.stream.Stream;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
Expand Down Expand Up @@ -171,6 +172,14 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
.toList();
}

@Override
public List<SchemaTableName> listIcebergTables(ConnectorSession session, Optional<String> namespace)
{
return listTables(session, namespace).stream()
.map(TableInfo::tableName)
.collect(toImmutableList());
}

@Override
public Optional<Iterator<RelationColumnsMetadata>> streamRelationColumns(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ public void testListTables()
.add(new TableInfo(table1, TABLE))
.add(new TableInfo(table2, TABLE));

ImmutableList.Builder<SchemaTableName> icebergTables = ImmutableList.<SchemaTableName>builder()
.add(table1)
.add(table2);
SchemaTableName view = new SchemaTableName(ns2, "view");
try {
catalog.createView(
Expand Down Expand Up @@ -493,17 +496,21 @@ public void testListTables()

createExternalIcebergTable(catalog, ns2, closer).ifPresent(table -> {
allTables.add(new TableInfo(table, TABLE));
icebergTables.add(table);
});
createExternalNonIcebergTable(catalog, ns2, closer).ifPresent(table -> {
allTables.add(new TableInfo(table, TABLE));
});

// No namespace provided, all tables across all namespaces should be returned
assertThat(catalog.listTables(SESSION, Optional.empty())).containsAll(allTables.build());
assertThat(catalog.listIcebergTables(SESSION, Optional.empty())).containsAll(icebergTables.build());
// Namespace is provided and exists
assertThat(catalog.listTables(SESSION, Optional.of(ns1))).containsExactly(new TableInfo(table1, TABLE));
assertThat(catalog.listIcebergTables(SESSION, Optional.of(ns1))).containsExactly(table1);
// Namespace is provided and does not exist
assertThat(catalog.listTables(SESSION, Optional.of("non_existing"))).isEmpty();
assertThat(catalog.listIcebergTables(SESSION, Optional.of("non_existing"))).isEmpty();
}
}

Expand Down

0 comments on commit bc385e7

Please sign in to comment.