From bc385e70622d0a54837bcd8253c7770c13f8519d Mon Sep 17 00:00:00 2001 From: lukasz-stec Date: Fri, 13 Dec 2024 11:22:11 +0100 Subject: [PATCH] Add TrinoCatalog.listIcebergTables --- .../plugin/iceberg/catalog/TrinoCatalog.java | 2 ++ .../catalog/glue/TrinoGlueCatalog.java | 17 ++++++++++++++ .../iceberg/catalog/hms/TrinoHiveCatalog.java | 23 +++++++++++++++++++ .../catalog/jdbc/TrinoJdbcCatalog.java | 21 +++++++++++++++++ .../catalog/nessie/TrinoNessieCatalog.java | 8 +++++++ .../catalog/rest/TrinoRestCatalog.java | 15 ++++++++++++ .../snowflake/TrinoSnowflakeCatalog.java | 9 ++++++++ .../iceberg/catalog/BaseTrinoCatalogTest.java | 7 ++++++ 8 files changed, 102 insertions(+) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 005760040b1b..c7370d65d810 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -83,6 +83,8 @@ default Optional getNamespaceSeparator() List listTables(ConnectorSession session, Optional namespace); + List listIcebergTables(ConnectorSession session, Optional namespace); + default List listViews(ConnectorSession session, Optional namespace) { return listTables(session, namespace).stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 7b5e4cb67d1b..6c8a3dfa44bb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -373,9 +373,26 @@ public void renameNamespace(ConnectorSession session, String source, String targ @Override public List listTables(ConnectorSession session, Optional namespace) + { + return listTables(session, namespace, _ -> true); + } + + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + return listTables(session, namespace, table -> isIcebergTable(getTableParameters(table))).stream() + .map(TableInfo::tableName) + .collect(toImmutableList()); + } + + private List listTables( + ConnectorSession session, + Optional namespace, + Predicate tablePredicate) { List>> tasks = listNamespaces(session, namespace).stream() .map(glueNamespace -> (Callable>) () -> getGlueTablesWithExceptionHandling(glueNamespace) + .filter(tablePredicate) .map(table -> mapToTableInfo(glueNamespace, table)) .collect(toImmutableList())) .collect(toImmutableList()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index b450063dba6f..908d7cc6dde9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -16,6 +16,7 @@ import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; import io.trino.cache.EvictableCacheBuilder; @@ -375,6 +376,28 @@ public List listTables(ConnectorSession session, Optional nam } } + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + List>> tasks = listNamespaces(session, namespace).stream() + .map(schema -> (Callable>) () -> metastore.getTableNamesWithParameters(schema, TABLE_TYPE_PROP, ImmutableSet.of( + // Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because + // Trino uses lowercase value whereas Spark and Flink use uppercase. + ICEBERG_TABLE_TYPE_VALUE.toLowerCase(ENGLISH), + ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH))).stream() + .map(tableName -> new SchemaTableName(schema, tableName)) + .collect(toImmutableList())) + .collect(toImmutableList()); + try { + return processWithAdditionalThreads(tasks, metadataFetchingExecutor).stream() + .flatMap(Collection::stream) + .collect(toImmutableList()); + } + catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + } + @Override public Optional> streamRelationColumns( ConnectorSession session, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 1d8daf1effd1..0e5c401c8ba5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -60,6 +60,7 @@ import org.apache.iceberg.view.ViewVersion; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -200,6 +201,26 @@ public List listTables(ConnectorSession session, Optional nam return ImmutableList.copyOf(tablesListBuilder.values()); } + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + List namespaces = listNamespaces(session, namespace); + + // Build as a set and convert to list for removing duplicate entries due to case difference + Set tablesListBuilder = new HashSet<>(); + for (String schemaName : namespaces) { + try { + listTableIdentifiers(schemaName, () -> jdbcCatalog.listTables(Namespace.of(schemaName))).stream() + .map(tableId -> SchemaTableName.schemaTableName(schemaName, tableId.name())) + .forEach(tablesListBuilder::add); + } + catch (NoSuchNamespaceException e) { + // Namespace may have been deleted + } + } + return ImmutableList.copyOf(tablesListBuilder); + } + @Override public List listViews(ConnectorSession session, Optional namespace) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 240e41f4cdd1..ed7be861fd9f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -170,6 +170,14 @@ public List listTables(ConnectorSession session, Optional nam .collect(toImmutableList()); } + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + return listTables(session, namespace).stream() + .map(TableInfo::tableName) + .collect(toImmutableList()); + } + @Override public Optional> streamRelationColumns( ConnectorSession session, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 40037baed704..d72f75d3f334 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -261,6 +261,21 @@ public List listTables(ConnectorSession session, Optional nam return tables.build(); } + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + SessionContext sessionContext = convert(session); + List namespaces = listNamespaces(session, namespace); + + ImmutableList.Builder tables = ImmutableList.builder(); + for (Namespace restNamespace : namespaces) { + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, toRemoteNamespace(session, restNamespace))).stream() + .map(id -> SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name())) + .forEach(tables::add); + } + return tables.build(); + } + @Override public List listViews(ConnectorSession session, Optional namespace) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/snowflake/TrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/snowflake/TrinoSnowflakeCatalog.java index fb901d4890b0..2f58396e6c69 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/snowflake/TrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/snowflake/TrinoSnowflakeCatalog.java @@ -58,6 +58,7 @@ import java.util.stream.Stream; import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; @@ -171,6 +172,14 @@ public List listTables(ConnectorSession session, Optional nam .toList(); } + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + return listTables(session, namespace).stream() + .map(TableInfo::tableName) + .collect(toImmutableList()); + } + @Override public Optional> streamRelationColumns( ConnectorSession session, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 1cf15d256401..e2ae7af4c66b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -450,6 +450,9 @@ public void testListTables() .add(new TableInfo(table1, TABLE)) .add(new TableInfo(table2, TABLE)); + ImmutableList.Builder icebergTables = ImmutableList.builder() + .add(table1) + .add(table2); SchemaTableName view = new SchemaTableName(ns2, "view"); try { catalog.createView( @@ -493,6 +496,7 @@ public void testListTables() createExternalIcebergTable(catalog, ns2, closer).ifPresent(table -> { allTables.add(new TableInfo(table, TABLE)); + icebergTables.add(table); }); createExternalNonIcebergTable(catalog, ns2, closer).ifPresent(table -> { allTables.add(new TableInfo(table, TABLE)); @@ -500,10 +504,13 @@ public void testListTables() // No namespace provided, all tables across all namespaces should be returned assertThat(catalog.listTables(SESSION, Optional.empty())).containsAll(allTables.build()); + assertThat(catalog.listIcebergTables(SESSION, Optional.empty())).containsAll(icebergTables.build()); // Namespace is provided and exists assertThat(catalog.listTables(SESSION, Optional.of(ns1))).containsExactly(new TableInfo(table1, TABLE)); + assertThat(catalog.listIcebergTables(SESSION, Optional.of(ns1))).containsExactly(table1); // Namespace is provided and does not exist assertThat(catalog.listTables(SESSION, Optional.of("non_existing"))).isEmpty(); + assertThat(catalog.listIcebergTables(SESSION, Optional.of("non_existing"))).isEmpty(); } }