From 415e5e2f59c7c1bd01fbd39806973ab76bfee8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Thu, 11 Jan 2024 18:16:12 +0100 Subject: [PATCH] Core: Add view support for JDBC catalog --- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 133 +++++- .../iceberg/jdbc/JdbcTableOperations.java | 69 +-- .../org/apache/iceberg/jdbc/JdbcUtil.java | 433 ++++++++++++++---- .../iceberg/jdbc/JdbcViewOperations.java | 201 ++++++++ .../org/apache/iceberg/jdbc/TestJdbcUtil.java | 73 ++- .../iceberg/jdbc/TestJdbcViewCatalog.java | 66 +++ 6 files changed, 828 insertions(+), 147 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java create mode 100644 core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 0bab6ade4c85..9d664646b1ce 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -37,7 +37,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; @@ -49,6 +48,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; @@ -60,10 +60,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JdbcCatalog extends BaseMetastoreCatalog +public class JdbcCatalog extends BaseMetastoreViewCatalog implements Configurable, SupportsNamespaces { public static final String PROPERTY_PREFIX = "jdbc."; @@ -164,8 +166,9 @@ private void initializeCatalogTables() throws InterruptedException, SQLException return true; } - LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute(); + LOG.debug( + "Creating table {} to store iceberg catalog tables", JdbcUtil.CATALOG_TABLE_NAME); + return conn.prepareStatement(JdbcUtil.createCatalogTableSql()).execute(); }); connections.run( @@ -187,6 +190,23 @@ private void initializeCatalogTables() throws InterruptedException, SQLException JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME); return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute(); }); + + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet viewTableExists = + dbMeta.getTables( + null /* catalog name */, + null /* schemaPattern */, + JdbcUtil.CATALOG_VIEW_NAME /* viewNamePattern */, + null /* types */); + if (viewTableExists.next()) { + return true; + } + + LOG.debug("Creating table {} to store iceberg catalog views", JdbcUtil.CATALOG_VIEW_NAME); + return conn.prepareStatement(JdbcUtil.createCatalogViewSql()).execute(); + }); } @Override @@ -195,6 +215,11 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { connections, io, catalogName, tableIdentifier, catalogProperties); } + @Override + protected ViewOperations newViewOps(TableIdentifier viewIdentifier) { + return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties); + } + @Override protected String defaultWarehouseLocation(TableIdentifier table) { return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name()); @@ -217,7 +242,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { int deletedRecords = execute( - JdbcUtil.DROP_TABLE_SQL, + JdbcUtil.dropTableSql(), catalogName, JdbcUtil.namespaceToString(identifier.namespace()), identifier.name()); @@ -245,13 +270,17 @@ public List listTables(Namespace namespace) { row -> JdbcUtil.stringToTableIdentifier( row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), - JdbcUtil.LIST_TABLES_SQL, + JdbcUtil.listTablesSql(), catalogName, JdbcUtil.namespaceToString(namespace)); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { + if (viewExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to); + } + int updatedRecords = execute( err -> { @@ -261,7 +290,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { throw new AlreadyExistsException("Table already exists: %s", to); } }, - JdbcUtil.RENAME_TABLE_SQL, + JdbcUtil.renameTableSql(), JdbcUtil.namespaceToString(to.namespace()), to.name(), catalogName, @@ -315,13 +344,18 @@ public List listNamespaces() { namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL, + JdbcUtil.listAllTableNamespacesSql(), catalogName)); namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)), JdbcUtil.LIST_ALL_PROPERTY_NAMESPACES_SQL, catalogName)); + namespaces.addAll( + fetch( + row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.VIEW_NAMESPACE)), + JdbcUtil.listAllViewNamespacesSql(), + catalogName)); namespaces = namespaces.stream() @@ -350,7 +384,7 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_NAMESPACES_SQL, + JdbcUtil.listTableNamespacesSql(), catalogName, JdbcUtil.namespaceToString(namespace) + "%")); namespaces.addAll( @@ -359,6 +393,12 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac JdbcUtil.LIST_PROPERTY_NAMESPACES_SQL, catalogName, JdbcUtil.namespaceToString(namespace) + "%")); + namespaces.addAll( + fetch( + row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.VIEW_NAMESPACE)), + JdbcUtil.listViewNamespacesSql(), + catalogName, + JdbcUtil.namespaceToString(namespace) + "%s")); int subNamespaceLevelLength = namespace.levels().length + 1; namespaces = @@ -503,6 +543,81 @@ public boolean namespaceExists(Namespace namespace) { return JdbcUtil.namespaceExists(catalogName, connections, namespace); } + @Override + public boolean dropView(TableIdentifier identifier) { + int deletedRecords = + execute( + JdbcUtil.dropViewSql(), + catalogName, + JdbcUtil.namespaceToString(identifier.namespace()), + identifier.name()); + + if (deletedRecords == 0) { + LOG.info("Skipping drop, view does not exist: {}", identifier); + return false; + } + + LOG.info("Dropped view: {}", identifier); + return true; + } + + @Override + public List listViews(Namespace namespace) { + if (!namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + return fetch( + row -> + JdbcUtil.stringToTableIdentifier( + row.getString(JdbcUtil.VIEW_NAMESPACE), row.getString(JdbcUtil.VIEW_NAME)), + JdbcUtil.listViewsSql(), + catalogName, + JdbcUtil.namespaceToString(namespace)); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + if (!viewExists(from)) { + throw new NoSuchViewException("View does not exist"); + } + + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace()); + } + + if (tableExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); + } + + int updatedRecords = + execute( + err -> { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null && err.getMessage().contains("constraint failed"))) { + throw new AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + }, + JdbcUtil.renameViewSql(), + JdbcUtil.namespaceToString(to.namespace()), + to.name(), + catalogName, + JdbcUtil.namespaceToString(from.namespace()), + from.name()); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", from, to); + } else if (updatedRecords == 0) { + throw new NoSuchViewException("View does not exist: %s", from); + } else { + LOG.warn( + "Rename operation affected {} rows: the catalog view's primary key assumption has been violated", + updatedRecords); + } + } + private int execute(String sql, String... args) { return execute(err -> {}, sql, args); } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index 6a7d594dd9f6..9d6fb2c00505 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -19,8 +19,6 @@ package org.apache.iceberg.jdbc; import java.sql.DataTruncation; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLNonTransientConnectionException; @@ -39,7 +37,6 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +68,7 @@ public void doRefresh() { Map table; try { - table = getTable(); + table = JdbcUtil.table(connections, catalogName, tableIdentifier); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted during refresh"); @@ -105,7 +102,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); try { - Map table = getTable(); + Map table = JdbcUtil.table(connections, catalogName, tableIdentifier); if (base != null) { validateMetadataLocation(table, base); @@ -140,6 +137,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { if (e.getMessage().contains("constraint failed")) { throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); } + throw new UncheckedSQLException(e, "Unknown failure"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -150,20 +148,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { private void updateTable(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = - connections.run( - conn -> { - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.DO_COMMIT_SQL)) { - // UPDATE - sql.setString(1, newMetadataLocation); - sql.setString(2, oldMetadataLocation); - // WHERE - sql.setString(3, catalogName); - sql.setString(4, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(5, tableIdentifier.name()); - sql.setString(6, oldMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.updateTable( + connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); if (updatedRecords == 1) { LOG.debug("Successfully committed to existing table: {}", tableIdentifier); @@ -182,18 +168,13 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr tableIdentifier, catalogName, namespace); } + if (JdbcUtil.viewExists(catalogName, connections, tableIdentifier)) { + throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier); + } + int insertRecord = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement(JdbcUtil.DO_COMMIT_CREATE_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(namespace)); - sql.setString(3, tableIdentifier.name()); - sql.setString(4, newMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.doCommitCreateTable( + connections, catalogName, namespace, tableIdentifier, newMetadataLocation); if (insertRecord == 1) { LOG.debug("Successfully committed to new table: {}", tableIdentifier); @@ -223,32 +204,4 @@ public FileIO io() { protected String tableName() { return tableIdentifier.toString(); } - - private Map getTable() - throws UncheckedSQLException, SQLException, InterruptedException { - return connections.run( - conn -> { - Map table = Maps.newHashMap(); - - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.GET_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(3, tableIdentifier.name()); - ResultSet rs = sql.executeQuery(); - - if (rs.next()) { - table.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); - table.put(JdbcUtil.TABLE_NAMESPACE, rs.getString(JdbcUtil.TABLE_NAMESPACE)); - table.put(JdbcUtil.TABLE_NAME, rs.getString(JdbcUtil.TABLE_NAME)); - table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); - table.put( - PREVIOUS_METADATA_LOCATION_PROP, rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); - } - - rs.close(); - } - - return table; - }); - } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index 4a515d1329ed..f8513b7a50b8 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -25,49 +25,52 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; final class JdbcUtil { // property to control strict-mode (aka check if namespace exists when creating a table) static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; - // Catalog Table + // Catalog Table & View static final String CATALOG_TABLE_NAME = "iceberg_tables"; + static final String CATALOG_VIEW_NAME = "iceberg_views"; static final String CATALOG_NAME = "catalog_name"; static final String TABLE_NAMESPACE = "table_namespace"; + static final String VIEW_NAMESPACE = "view_namespace"; static final String TABLE_NAME = "table_name"; + static final String VIEW_NAME = "view_name"; - static final String DO_COMMIT_SQL = + private static final String DO_COMMIT = "UPDATE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? AND " + + " = ? AND" + + " %s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + " %s = ? AND " // TABLE_NAME or VIEW_NAME + JdbcTableOperations.METADATA_LOCATION_PROP + " = ?"; - static final String CREATE_CATALOG_TABLE = + private static final String CREATE_CATALOG = "CREATE TABLE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + "(" + CATALOG_NAME + " VARCHAR(255) NOT NULL," - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + " VARCHAR(255) NOT NULL," - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + " VARCHAR(255) NOT NULL," + JdbcTableOperations.METADATA_LOCATION_PROP + " VARCHAR(1000)," @@ -76,96 +79,77 @@ final class JdbcUtil { + "PRIMARY KEY (" + CATALOG_NAME + ", " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + ", " - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + private static final String GET = "SELECT * FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND" + + " %s = ? AND" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " %s = ?"; // TABLE_NAME or VIEW_NAME + private static final String LIST = "SELECT * FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + " = ?"; - static final String RENAME_TABLE_SQL = + private static final String RENAME = "UPDATE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " SET " - + TABLE_NAMESPACE - + " = ? , " - + TABLE_NAME - + " = ? " + + "%s = ?, " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?" // TABLE_NAME or VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String DROP_TABLE_SQL = + + " = ? AND" + + " %s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?"; // TABLE_NAME or VIEW_NAME + private static final String DROP = "DELETE FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String GET_NAMESPACE_SQL = + + "%s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?"; // TABLE_NAME or VIEW_NAME + private static final String GET_NAMESPACE = "SELECT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + " ( " - + TABLE_NAMESPACE - + " = ? OR " - + TABLE_NAMESPACE - + " LIKE ? ESCAPE '\\' " - + " ) " + + " = ? AND" + + " (%s = ? OR %s LIKE ? ESCAPE '\\')" // TABLE_NAMESPACE or VIEW_NAMESPACE + " LIMIT 1"; - static final String LIST_NAMESPACES_SQL = + private static final String LIST_NAMESPACES = "SELECT DISTINCT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " LIKE ?"; - static final String LIST_ALL_TABLE_NAMESPACES_SQL = - "SELECT DISTINCT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + " = ? AND %s LIKE ?"; // TABLE_NAMESPACE or VIEW_NAMESPACE + private static final String LIST_ALL_NAMESPACES = + "SELECT DISTINCT %s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ?"; - static final String DO_COMMIT_CREATE_TABLE_SQL = + private static final String DO_COMMIT_CREATE = "INSERT INTO " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " (" + CATALOG_NAME + ", " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + ", " - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + ", " + JdbcTableOperations.METADATA_LOCATION_PROP + ", " @@ -278,20 +262,20 @@ final class JdbcUtil { private JdbcUtil() {} - public static Namespace stringToNamespace(String namespace) { + static Namespace stringToNamespace(String namespace) { Preconditions.checkArgument(namespace != null, "Invalid namespace %s", namespace); return Namespace.of(Iterables.toArray(SPLITTER_DOT.split(namespace), String.class)); } - public static String namespaceToString(Namespace namespace) { + static String namespaceToString(Namespace namespace) { return JOINER_DOT.join(namespace.levels()); } - public static TableIdentifier stringToTableIdentifier(String tableNamespace, String tableName) { + static TableIdentifier stringToTableIdentifier(String tableNamespace, String tableName) { return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName); } - public static Properties filterAndRemovePrefix(Map properties, String prefix) { + static Properties filterAndRemovePrefix(Map properties, String prefix) { Properties result = new Properties(); properties.forEach( (key, value) -> { @@ -303,7 +287,294 @@ public static Properties filterAndRemovePrefix(Map properties, S return result; } - public static String updatePropertiesStatement(int size) { + private static String getSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(GET, tableName, namespace, name); + } + + static String tableSql() { + return getSql(true); + } + + static String viewSql() { + return getSql(false); + } + + private static String namespaceSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format(GET_NAMESPACE, namespace, tableName, namespace, namespace); + } + + static String tableNamespaceSql() { + return namespaceSql(true); + } + + static String viewNamespaceSql() { + return namespaceSql(false); + } + + private static String listNamespacesSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format(LIST_NAMESPACES, namespace, tableName, namespace); + } + + static String listTableNamespacesSql() { + return listNamespacesSql(true); + } + + static String listViewNamespacesSql() { + return listNamespacesSql(false); + } + + private static String listAllNamespacesSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format(LIST_ALL_NAMESPACES, namespace, tableName); + } + + static String listAllTableNamespacesSql() { + return listAllNamespacesSql(true); + } + + static String listAllViewNamespacesSql() { + return listAllNamespacesSql(false); + } + + private static String doCommitSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(DO_COMMIT, tableName, namespace, name); + } + + private static String createCatalogSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(CREATE_CATALOG, tableName, namespace, name, namespace, name); + } + + static String createCatalogTableSql() { + return createCatalogSql(true); + } + + static String createCatalogViewSql() { + return createCatalogSql(false); + } + + private static String listSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format(LIST, tableName, namespace); + } + + static String listTablesSql() { + return listSql(true); + } + + static String listViewsSql() { + return listSql(false); + } + + private static String renameSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(RENAME, tableName, namespace, name, namespace, name); + } + + static String renameTableSql() { + return renameSql(true); + } + + static String renameViewSql() { + return renameSql(false); + } + + private static String dropSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(DROP, tableName, namespace, name); + } + + static String dropTableSql() { + return dropSql(true); + } + + static String dropViewSql() { + return dropSql(false); + } + + private static String doCommitCreateSql(boolean isTable) { + String tableName = isTable ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String namespace = isTable ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String name = isTable ? TABLE_NAME : VIEW_NAME; + return String.format(DO_COMMIT_CREATE, tableName, namespace, name); + } + + private static int update( + boolean isTable, + JdbcClientPool connections, + String catalogName, + TableIdentifier identifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(doCommitSql(isTable))) { + // UPDATE + sql.setString(1, newMetadataLocation); + sql.setString(2, oldMetadataLocation); + // WHERE + sql.setString(3, catalogName); + sql.setString(4, namespaceToString(identifier.namespace())); + sql.setString(5, identifier.name()); + sql.setString(6, oldMetadataLocation); + return sql.executeUpdate(); + } + }); + } + + static int updateTable( + JdbcClientPool connections, + String catalogName, + TableIdentifier tableIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return update( + true, connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); + } + + static int updateView( + JdbcClientPool connections, + String catalogName, + TableIdentifier viewIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return update( + false, connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + } + + private static Map tableOrView( + boolean isTable, JdbcClientPool connections, String catalogName, TableIdentifier identifier) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + Map tableOrView = Maps.newHashMap(); + + String getTableOrViewSqlStatement = JdbcUtil.getSql(isTable); + try (PreparedStatement sql = conn.prepareStatement(getTableOrViewSqlStatement)) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(identifier.namespace())); + sql.setString(3, identifier.name()); + ResultSet rs = sql.executeQuery(); + + if (rs.next()) { + tableOrView.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); + tableOrView.put( + (isTable ? JdbcUtil.TABLE_NAMESPACE : JdbcUtil.VIEW_NAMESPACE), + rs.getString((isTable ? JdbcUtil.TABLE_NAMESPACE : JdbcUtil.VIEW_NAMESPACE))); + tableOrView.put( + (isTable ? JdbcUtil.TABLE_NAME : JdbcUtil.VIEW_NAME), + rs.getString((isTable ? JdbcUtil.TABLE_NAME : JdbcUtil.VIEW_NAME))); + tableOrView.put( + BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)); + tableOrView.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP)); + } + + rs.close(); + } + + return tableOrView; + }); + } + + static Map table( + JdbcClientPool connections, String catalogName, TableIdentifier tableIdentifier) + throws SQLException, InterruptedException { + return tableOrView(true, connections, catalogName, tableIdentifier); + } + + static Map view( + JdbcClientPool connections, String catalogName, TableIdentifier viewIdentifier) + throws SQLException, InterruptedException { + return tableOrView(false, connections, catalogName, viewIdentifier); + } + + private static int doCommitCreate( + boolean isTable, + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier identifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(doCommitCreateSql(isTable))) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(namespace)); + sql.setString(3, identifier.name()); + sql.setString(4, newMetadataLocation); + return sql.executeUpdate(); + } + }); + } + + static int doCommitCreateTable( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier tableIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreate( + true, connections, catalogName, namespace, tableIdentifier, newMetadataLocation); + } + + static int doCommitCreateView( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier viewIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreate( + false, connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + } + + static boolean viewExists( + String catalogName, JdbcClientPool connections, TableIdentifier viewIdentifier) { + return exists( + connections, + JdbcUtil.getSql(false), + catalogName, + namespaceToString(viewIdentifier.namespace()), + viewIdentifier.name()); + } + + static boolean tableExists( + String catalogName, JdbcClientPool connections, TableIdentifier tableIdentifier) { + return exists( + connections, + JdbcUtil.getSql(true), + catalogName, + namespaceToString(tableIdentifier.namespace()), + tableIdentifier.name()); + } + + static String updatePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder( "UPDATE " @@ -314,6 +585,7 @@ public static String updatePropertiesStatement(int size) { for (int i = 0; i < size; i += 1) { sqlStatement.append(" WHEN " + NAMESPACE_PROPERTY_KEY + " = ? THEN ?"); } + sqlStatement.append( " END WHERE " + CATALOG_NAME @@ -329,7 +601,7 @@ public static String updatePropertiesStatement(int size) { return sqlStatement.toString(); } - public static String insertPropertiesStatement(int size) { + static String insertPropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { @@ -342,7 +614,7 @@ public static String insertPropertiesStatement(int size) { return sqlStatement.toString(); } - public static String deletePropertiesStatement(Set properties) { + static String deletePropertiesStatement(Set properties) { StringBuilder sqlStatement = new StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL); String values = String.join(",", Collections.nCopies(properties.size(), String.valueOf('?'))); sqlStatement.append("(").append(values).append(")"); @@ -360,7 +632,7 @@ static boolean namespaceExists( namespaceEquals.replace("\\", "\\\\").replace("_", "\\_").replace("%", "\\%") + ".%"; if (exists( connections, - JdbcUtil.GET_NAMESPACE_SQL, + JdbcUtil.tableNamespaceSql(), catalogName, namespaceEquals, namespaceStartsWith)) { @@ -376,7 +648,12 @@ static boolean namespaceExists( return true; } - return false; + return exists( + connections, + JdbcUtil.viewNamespaceSql(), + catalogName, + namespaceEquals, + namespaceStartsWith); } @SuppressWarnings("checkstyle:NestedTryDepth") diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java new file mode 100644 index 000000000000..bd799ae3a835 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.DataTruncation; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.SQLTimeoutException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLWarning; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** JDBC implementation of Iceberg ViewOperations. */ +public class JdbcViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcViewOperations.class); + private final String catalogName; + private final TableIdentifier viewIdentifier; + private final FileIO fileIO; + private final JdbcClientPool connections; + private final Map catalogProperties; + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map catalogProperties) { + this.catalogName = catalogName; + this.viewIdentifier = viewIdentifier; + this.fileIO = fileIO; + this.connections = dbConnPool; + this.catalogProperties = catalogProperties; + } + + @Override + protected void doRefresh() { + Map view; + + try { + view = JdbcUtil.view(connections, catalogName, viewIdentifier); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during refresh"); + } catch (SQLException e) { + // SQL exception happened when getting view from catalog + throw new UncheckedSQLException( + e, "Failed to get view %s from catalog %s", viewIdentifier, catalogName); + } + + if (view.isEmpty()) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } else { + this.disableRefresh(); + return; + } + } + + String newMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + Preconditions.checkState( + newMetadataLocation != null, "Invalid view %s: metadata location is null", viewIdentifier); + refreshFromMetadataLocation(newMetadataLocation); + } + + @Override + protected void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + try { + Map view = JdbcUtil.view(connections, catalogName, viewIdentifier); + if (base != null) { + validateMetadataLocation(view, base); + String oldMetadataLocation = base.metadataFileLocation(); + // Start atomic update + LOG.debug("Committing existing view: {}", viewName()); + updateView(newMetadataLocation, oldMetadataLocation); + } else { + // view not exists create it + LOG.debug("Committing new view: {}", viewName()); + createView(newMetadataLocation); + } + + } catch (SQLIntegrityConstraintViolationException e) { + if (currentMetadataLocation() == null) { + throw new AlreadyExistsException(e, "View already exists: %s", viewIdentifier); + } else { + throw new UncheckedSQLException(e, "View already exists: %s", viewIdentifier); + } + + } catch (SQLTimeoutException e) { + throw new UncheckedSQLException(e, "Database Connection timeout"); + } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { + throw new UncheckedSQLException(e, "Database Connection failed"); + } catch (DataTruncation e) { + throw new UncheckedSQLException(e, "Database data truncation error"); + } catch (SQLWarning e) { + throw new UncheckedSQLException(e, "Database warning"); + } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + + throw new UncheckedSQLException(e, "Unknown failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during commit"); + } + } + + @Override + protected String viewName() { + return viewIdentifier.toString(); + } + + @Override + protected FileIO io() { + return fileIO; + } + + private void validateMetadataLocation(Map view, ViewMetadata base) { + String catalogMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s: metadata location %s has changed from %s", + viewIdentifier, baseMetadataLocation, catalogMetadataLocation); + } + } + + private void updateView(String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + JdbcUtil.updateView( + connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to update view %s from catalog %s", viewIdentifier, catalogName); + } + } + + private void createView(String newMetadataLocation) throws SQLException, InterruptedException { + Namespace namespace = viewIdentifier.namespace(); + if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) + && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + throw new NoSuchNamespaceException( + "Cannot create view %s in catalog %s. Namespace %s does not exist", + viewIdentifier, catalogName, namespace); + } + + if (JdbcUtil.tableExists(catalogName, connections, viewIdentifier)) { + throw new AlreadyExistsException("Table with same name already exists: %s", viewIdentifier); + } + + int insertRecord = + JdbcUtil.doCommitCreateView( + connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + + if (insertRecord == 1) { + LOG.debug("Successfully committed to new view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to create view %s in catalog %s", viewIdentifier, catalogName); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java index 0dec6eb83ebb..efb842e8917b 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -18,14 +18,83 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; public class TestJdbcUtil { + @Test + public void testGetSql() { + assertThat(JdbcUtil.tableSql()) + .isEqualTo( + "SELECT * FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + assertThat(JdbcUtil.viewSql()) + .isEqualTo( + "SELECT * FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + + @Test + public void testNamespaceSql() { + assertThat(JdbcUtil.tableNamespaceSql()) + .isEqualTo( + "SELECT table_namespace FROM iceberg_tables WHERE catalog_name = ? AND (table_namespace = ? OR table_namespace LIKE ? ESCAPE '\\') LIMIT 1"); + assertThat(JdbcUtil.viewNamespaceSql()) + .isEqualTo( + "SELECT view_namespace FROM iceberg_views WHERE catalog_name = ? AND (view_namespace = ? OR view_namespace LIKE ? ESCAPE '\\') LIMIT 1"); + } + + @Test + public void testListNamespacesSql() { + assertThat(JdbcUtil.listTableNamespacesSql()) + .isEqualTo( + "SELECT DISTINCT table_namespace FROM iceberg_tables WHERE catalog_name = ? AND table_namespace LIKE ?"); + assertThat(JdbcUtil.listViewNamespacesSql()) + .isEqualTo( + "SELECT DISTINCT view_namespace FROM iceberg_views WHERE catalog_name = ? AND view_namespace LIKE ?"); + } + + @Test + public void testCreateSql() { + assertThat(JdbcUtil.createCatalogTableSql()) + .isEqualTo( + "CREATE TABLE iceberg_tables(catalog_name VARCHAR(255) NOT NULL,table_namespace VARCHAR(255) NOT NULL,table_name VARCHAR(255) NOT NULL,metadata_location VARCHAR(1000),previous_metadata_location VARCHAR(1000),PRIMARY KEY (catalog_name, table_namespace, table_name))"); + assertThat(JdbcUtil.createCatalogViewSql()) + .isEqualTo( + "CREATE TABLE iceberg_views(catalog_name VARCHAR(255) NOT NULL,view_namespace VARCHAR(255) NOT NULL,view_name VARCHAR(255) NOT NULL,metadata_location VARCHAR(1000),previous_metadata_location VARCHAR(1000),PRIMARY KEY (catalog_name, view_namespace, view_name))"); + } + + @Test + public void testListSql() { + assertThat(JdbcUtil.listTablesSql()) + .isEqualTo("SELECT * FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ?"); + assertThat(JdbcUtil.listViewsSql()) + .isEqualTo("SELECT * FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ?"); + } + + @Test + public void testRenameSql() { + assertThat(JdbcUtil.renameTableSql()) + .isEqualTo( + "UPDATE iceberg_tables SET table_namespace = ?, table_name = ? WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + assertThat(JdbcUtil.renameViewSql()) + .isEqualTo( + "UPDATE iceberg_views SET view_namespace = ?, view_name = ? WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + + @Test + public void testDropSql() { + assertThat(JdbcUtil.dropTableSql()) + .isEqualTo( + "DELETE FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + assertThat(JdbcUtil.dropViewSql()) + .isEqualTo( + "DELETE FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + @Test public void testFilterAndRemovePrefix() { Map input = Maps.newHashMap(); @@ -42,6 +111,6 @@ public void testFilterAndRemovePrefix() { Properties actual = JdbcUtil.filterAndRemovePrefix(input, "jdbc."); - Assertions.assertThat(expected).isEqualTo(actual); + assertThat(expected).isEqualTo(actual); } } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java new file mode 100644 index 000000000000..c7bdb158cdf2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.ViewCatalogTests; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +public class TestJdbcViewCatalog extends ViewCatalogTests { + + private JdbcCatalog catalog; + + @TempDir private java.nio.file.Path tableDir; + + @BeforeEach + public void before() { + Map properties = Maps.newHashMap(); + properties.put( + CatalogProperties.URI, + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString()); + + catalog = new JdbcCatalog(); + catalog.setConf(new Configuration()); + catalog.initialize("testCatalog", properties); + } + + @Override + protected JdbcCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +}