From ac2d4c180980a54e319daf4cb269ee885f81e683 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Thu, 11 Jan 2024 18:16:12 +0100 Subject: [PATCH] Core: Add view support for JDBC catalog --- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 130 ++++- .../iceberg/jdbc/JdbcTableOperations.java | 68 +-- .../org/apache/iceberg/jdbc/JdbcUtil.java | 453 +++++++++++++++--- .../iceberg/jdbc/JdbcViewOperations.java | 190 ++++++++ .../org/apache/iceberg/jdbc/TestJdbcUtil.java | 82 ++++ .../iceberg/jdbc/TestJdbcViewCatalog.java | 69 +++ 6 files changed, 854 insertions(+), 138 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java create mode 100644 core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 0bab6ade4c85..7daf9078bde2 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -37,7 +37,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; @@ -49,6 +48,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; @@ -60,10 +60,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JdbcCatalog extends BaseMetastoreCatalog +public class JdbcCatalog extends BaseMetastoreViewCatalog implements Configurable, SupportsNamespaces { public static final String PROPERTY_PREFIX = "jdbc."; @@ -164,8 +166,9 @@ private void initializeCatalogTables() throws InterruptedException, SQLException return true; } - LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute(); + LOG.debug( + "Creating table {} to store iceberg catalog tables", JdbcUtil.CATALOG_TABLE_NAME); + return conn.prepareStatement(JdbcUtil.createCatalogTableSql()).execute(); }); connections.run( @@ -187,6 +190,23 @@ private void initializeCatalogTables() throws InterruptedException, SQLException JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME); return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute(); }); + + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet viewTableExists = + dbMeta.getTables( + null /* catalog name */, + null /* schemaPattern */, + JdbcUtil.CATALOG_VIEW_NAME /* viewNamePattern */, + null /* types */); + if (viewTableExists.next()) { + return true; + } + + LOG.debug("Creating table {} to store iceberg catalog views", JdbcUtil.CATALOG_VIEW_NAME); + return conn.prepareStatement(JdbcUtil.createCatalogViewSql()).execute(); + }); } @Override @@ -195,6 +215,11 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { connections, io, catalogName, tableIdentifier, catalogProperties); } + @Override + protected ViewOperations newViewOps(TableIdentifier viewIdentifier) { + return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties); + } + @Override protected String defaultWarehouseLocation(TableIdentifier table) { return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name()); @@ -217,7 +242,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { int deletedRecords = execute( - JdbcUtil.DROP_TABLE_SQL, + JdbcUtil.dropTableSql(), catalogName, JdbcUtil.namespaceToString(identifier.namespace()), identifier.name()); @@ -240,18 +265,20 @@ public List listTables(Namespace namespace) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } - return fetch( row -> JdbcUtil.stringToTableIdentifier( row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), - JdbcUtil.LIST_TABLES_SQL, + JdbcUtil.listTablesSql(), catalogName, JdbcUtil.namespaceToString(namespace)); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { + if (viewExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to); + } int updatedRecords = execute( err -> { @@ -261,7 +288,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { throw new AlreadyExistsException("Table already exists: %s", to); } }, - JdbcUtil.RENAME_TABLE_SQL, + JdbcUtil.renameTableSql(), JdbcUtil.namespaceToString(to.namespace()), to.name(), catalogName, @@ -315,13 +342,18 @@ public List listNamespaces() { namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL, + JdbcUtil.listAllTableNamespacesSql(), catalogName)); namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)), JdbcUtil.LIST_ALL_PROPERTY_NAMESPACES_SQL, catalogName)); + namespaces.addAll( + fetch( + row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.VIEW_NAMESPACE)), + JdbcUtil.listAllViewNamespacesSql(), + catalogName)); namespaces = namespaces.stream() @@ -350,7 +382,7 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_NAMESPACES_SQL, + JdbcUtil.listTableNamespacesSql(), catalogName, JdbcUtil.namespaceToString(namespace) + "%")); namespaces.addAll( @@ -359,6 +391,12 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac JdbcUtil.LIST_PROPERTY_NAMESPACES_SQL, catalogName, JdbcUtil.namespaceToString(namespace) + "%")); + namespaces.addAll( + fetch( + row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.VIEW_NAMESPACE)), + JdbcUtil.listViewNamespacesSql(), + catalogName, + JdbcUtil.namespaceToString(namespace) + "%s")); int subNamespaceLevelLength = namespace.levels().length + 1; namespaces = @@ -503,6 +541,78 @@ public boolean namespaceExists(Namespace namespace) { return JdbcUtil.namespaceExists(catalogName, connections, namespace); } + @Override + public boolean dropView(TableIdentifier identifier) { + int deletedRecords = + execute( + JdbcUtil.dropViewSql(), + catalogName, + JdbcUtil.namespaceToString(identifier.namespace()), + identifier.name()); + + if (deletedRecords == 0) { + LOG.info("Skipping drop, view does not exist: {}", identifier); + return false; + } + + LOG.info("Dropped view: {}", identifier); + return true; + } + + @Override + public List listViews(Namespace namespace) { + if (!namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + return fetch( + row -> + JdbcUtil.stringToTableIdentifier( + row.getString(JdbcUtil.VIEW_NAMESPACE), row.getString(JdbcUtil.VIEW_NAME)), + JdbcUtil.listViewsSql(), + catalogName, + JdbcUtil.namespaceToString(namespace)); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + if (!viewExists(from)) { + throw new NoSuchViewException("View does not exist"); + } + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace()); + } + if (tableExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); + } + + int updatedRecords = + execute( + err -> { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null && err.getMessage().contains("constraint failed"))) { + throw new AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + }, + JdbcUtil.renameViewSql(), + JdbcUtil.namespaceToString(to.namespace()), + to.name(), + catalogName, + JdbcUtil.namespaceToString(from.namespace()), + from.name()); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", from, to); + } else if (updatedRecords == 0) { + throw new NoSuchViewException("View does not exist: %s", from); + } else { + LOG.warn( + "Rename operation affected {} rows: the catalog view's primary key assumption has been violated", + updatedRecords); + } + } + private int execute(String sql, String... args) { return execute(err -> {}, sql, args); } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index 6a7d594dd9f6..5460a9d71066 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -19,8 +19,6 @@ package org.apache.iceberg.jdbc; import java.sql.DataTruncation; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLNonTransientConnectionException; @@ -39,7 +37,6 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +68,7 @@ public void doRefresh() { Map table; try { - table = getTable(); + table = JdbcUtil.getTable(connections, catalogName, tableIdentifier); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted during refresh"); @@ -105,7 +102,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); try { - Map table = getTable(); + Map table = JdbcUtil.getTable(connections, catalogName, tableIdentifier); if (base != null) { validateMetadataLocation(table, base); @@ -150,20 +147,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { private void updateTable(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = - connections.run( - conn -> { - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.DO_COMMIT_SQL)) { - // UPDATE - sql.setString(1, newMetadataLocation); - sql.setString(2, oldMetadataLocation); - // WHERE - sql.setString(3, catalogName); - sql.setString(4, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(5, tableIdentifier.name()); - sql.setString(6, oldMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.updateTable( + connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); if (updatedRecords == 1) { LOG.debug("Successfully committed to existing table: {}", tableIdentifier); @@ -182,18 +167,13 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr tableIdentifier, catalogName, namespace); } + if (JdbcUtil.viewExists(catalogName, connections, tableIdentifier)) { + throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier); + } + int insertRecord = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement(JdbcUtil.DO_COMMIT_CREATE_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(namespace)); - sql.setString(3, tableIdentifier.name()); - sql.setString(4, newMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.doCommitCreateTable( + connections, catalogName, namespace, tableIdentifier, newMetadataLocation); if (insertRecord == 1) { LOG.debug("Successfully committed to new table: {}", tableIdentifier); @@ -223,32 +203,4 @@ public FileIO io() { protected String tableName() { return tableIdentifier.toString(); } - - private Map getTable() - throws UncheckedSQLException, SQLException, InterruptedException { - return connections.run( - conn -> { - Map table = Maps.newHashMap(); - - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.GET_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(3, tableIdentifier.name()); - ResultSet rs = sql.executeQuery(); - - if (rs.next()) { - table.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); - table.put(JdbcUtil.TABLE_NAMESPACE, rs.getString(JdbcUtil.TABLE_NAMESPACE)); - table.put(JdbcUtil.TABLE_NAME, rs.getString(JdbcUtil.TABLE_NAME)); - table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); - table.put( - PREVIOUS_METADATA_LOCATION_PROP, rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); - } - - rs.close(); - } - - return table; - }); - } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index 4a515d1329ed..1b6bfd39954d 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -25,49 +25,52 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; final class JdbcUtil { // property to control strict-mode (aka check if namespace exists when creating a table) static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; - // Catalog Table + // Catalog Table & View static final String CATALOG_TABLE_NAME = "iceberg_tables"; + static final String CATALOG_VIEW_NAME = "iceberg_views"; static final String CATALOG_NAME = "catalog_name"; static final String TABLE_NAMESPACE = "table_namespace"; + static final String VIEW_NAMESPACE = "view_namespace"; static final String TABLE_NAME = "table_name"; + static final String VIEW_NAME = "view_name"; - static final String DO_COMMIT_SQL = + private static final String DO_COMMIT_TABLE_OR_VIEW_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? AND " + + " = ? AND" + + " %s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + " %s = ? AND " // TABLE_NAME or VIEW_NAME + JdbcTableOperations.METADATA_LOCATION_PROP + " = ?"; - static final String CREATE_CATALOG_TABLE = + private static final String CREATE_CATALOG_TABLE_OR_VIEW = "CREATE TABLE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + "(" + CATALOG_NAME + " VARCHAR(255) NOT NULL," - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + " VARCHAR(255) NOT NULL," - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + " VARCHAR(255) NOT NULL," + JdbcTableOperations.METADATA_LOCATION_PROP + " VARCHAR(1000)," @@ -76,96 +79,77 @@ final class JdbcUtil { + "PRIMARY KEY (" + CATALOG_NAME + ", " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + ", " - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + private static final String GET_TABLE_OR_VIEW_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND" + + " %s = ? AND" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " %s = ?"; // TABLE_NAME or VIEW_NAME + private static final String LIST_TABLES_OR_VIEWS_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + " = ?"; - static final String RENAME_TABLE_SQL = + private static final String RENAME_TABLE_OR_VIEW_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " SET " - + TABLE_NAMESPACE - + " = ? , " - + TABLE_NAME - + " = ? " + + "%s = ?, " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?" // TABLE_NAME or VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String DROP_TABLE_SQL = + + " = ? AND" + + " %s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?"; // TABLE_NAME or VIEW_NAME + private static final String DROP_TABLE_OR_VIEW_SQL = "DELETE FROM " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? "; - static final String GET_NAMESPACE_SQL = + + "%s = ? AND " // TABLE_NAMESPACE or VIEW_NAMESPACE + + "%s = ?"; // TABLE_NAME or VIEW_NAME + private static final String GET_TABLE_OR_VIEW_NAMESPACE_SQL = "SELECT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + " ( " - + TABLE_NAMESPACE - + " = ? OR " - + TABLE_NAMESPACE - + " LIKE ? ESCAPE '\\' " - + " ) " + + " = ? AND" + + " (%s = ? OR %s LIKE ? ESCAPE '\\')" // TABLE_NAMESPACE or VIEW_NAMESPACE + " LIMIT 1"; - static final String LIST_NAMESPACES_SQL = + private static final String LIST_TABLE_OR_VIEW_NAMESPACES_SQL = "SELECT DISTINCT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME - + " = ? AND " - + TABLE_NAMESPACE - + " LIKE ?"; - static final String LIST_ALL_TABLE_NAMESPACES_SQL = - "SELECT DISTINCT " - + TABLE_NAMESPACE - + " FROM " - + CATALOG_TABLE_NAME + + " = ? AND %s LIKE ?"; // TABLE_NAMESPACE or VIEW_NAMESPACE + private static final String LIST_ALL_TABLE_OR_VIEW_NAMESPACES_SQL = + "SELECT DISTINCT %s" // TABLE_NAMESPACE or VIEW_NAMESPACE + + " FROM %s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ?"; - static final String DO_COMMIT_CREATE_TABLE_SQL = + private static final String DO_COMMIT_CREATE_TABLE_OR_VIEW_SQL = "INSERT INTO " - + CATALOG_TABLE_NAME + + "%s" // CATALOG_TABLE_NAME or CATALOG_VIEW_NAME + " (" + CATALOG_NAME + ", " - + TABLE_NAMESPACE + + "%s" // TABLE_NAMESPACE or VIEW_NAMESPACE + ", " - + TABLE_NAME + + "%s" // TABLE_NAME or VIEW_NAME + ", " + JdbcTableOperations.METADATA_LOCATION_PROP + ", " @@ -303,6 +287,325 @@ public static Properties filterAndRemovePrefix(Map properties, S return result; } + static String getTableOrViewSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + private static String getTableOrViewNamespaceSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format( + GET_TABLE_OR_VIEW_NAMESPACE_SQL, + tableOrViewNamespace, + tableOrViewTableName, + tableOrViewNamespace, + tableOrViewNamespace); + } + + public static String getTableNamespaceSql() { + return getTableOrViewNamespaceSql(true); + } + + public static String getViewNamespaceSql() { + return getTableOrViewNamespaceSql(false); + } + + private static String listTableOrViewNamespacesSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format( + LIST_TABLE_OR_VIEW_NAMESPACES_SQL, + tableOrViewNamespace, + tableOrViewTableName, + tableOrViewNamespace); + } + + public static String listTableNamespacesSql() { + return listTableOrViewNamespacesSql(true); + } + + public static String listViewNamespacesSql() { + return listTableOrViewNamespacesSql(false); + } + + private static String listAllTableOrViewNamespacesSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format( + LIST_ALL_TABLE_OR_VIEW_NAMESPACES_SQL, tableOrViewNamespace, tableOrViewTableName); + } + + public static String listAllTableNamespacesSql() { + return listAllTableOrViewNamespacesSql(true); + } + + public static String listAllViewNamespacesSql() { + return listAllTableOrViewNamespacesSql(false); + } + + static String doCommitSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + private static String createCatalogTableOrViewSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + CREATE_CATALOG_TABLE_OR_VIEW, + tableOrViewTableName, + tableOrViewNamespace, + tableOrViewName, + tableOrViewNamespace, + tableOrViewName); + } + + public static String createCatalogTableSql() { + return createCatalogTableOrViewSql(true); + } + + public static String createCatalogViewSql() { + return createCatalogTableOrViewSql(false); + } + + private static String listTablesOrViewsSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, tableOrViewNamespace); + } + + public static String listTablesSql() { + return listTablesOrViewsSql(true); + } + + public static String listViewsSql() { + return listTablesOrViewsSql(false); + } + + private static String renameTableOrViewSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + RENAME_TABLE_OR_VIEW_SQL, + tableOrViewTableName, + tableOrViewNamespace, + tableOrViewName, + tableOrViewNamespace, + tableOrViewName); + } + + public static String renameTableSql() { + return renameTableOrViewSql(true); + } + + public static String renameViewSql() { + return renameTableOrViewSql(false); + } + + private static String dropTableOrViewSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + public static String dropTableSql() { + return dropTableOrViewSql(true); + } + + public static String dropViewSql() { + return dropTableOrViewSql(false); + } + + private static String doCommitCreateTableOrViewSql(boolean table) { + String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; + String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; + String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; + return String.format( + DO_COMMIT_CREATE_TABLE_OR_VIEW_SQL, + tableOrViewTableName, + tableOrViewNamespace, + tableOrViewName); + } + + private static int updateTableOrView( + boolean table, + JdbcClientPool connections, + String catalogName, + TableIdentifier tableOrViewIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(doCommitSql(table))) { + // UPDATE + sql.setString(1, newMetadataLocation); + sql.setString(2, oldMetadataLocation); + // WHERE + sql.setString(3, catalogName); + sql.setString(4, namespaceToString(tableOrViewIdentifier.namespace())); + sql.setString(5, tableOrViewIdentifier.name()); + sql.setString(6, oldMetadataLocation); + return sql.executeUpdate(); + } + }); + } + + public static int updateTable( + JdbcClientPool connections, + String catalogName, + TableIdentifier tableIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return updateTableOrView( + true, connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); + } + + public static int updateView( + JdbcClientPool connections, + String catalogName, + TableIdentifier viewIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return updateTableOrView( + false, connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + } + + private static Map getTableOrView( + boolean table, + JdbcClientPool connections, + String catalogName, + TableIdentifier tableOrViewIdentifier) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + Map tableOrView = Maps.newHashMap(); + + String getTableOrViewSqlStatement = JdbcUtil.getTableOrViewSql(table); + try (PreparedStatement sql = conn.prepareStatement(getTableOrViewSqlStatement)) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(tableOrViewIdentifier.namespace())); + sql.setString(3, tableOrViewIdentifier.name()); + ResultSet rs = sql.executeQuery(); + + if (rs.next()) { + tableOrView.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); + tableOrView.put( + (table ? JdbcUtil.TABLE_NAMESPACE : JdbcUtil.VIEW_NAMESPACE), + rs.getString((table ? JdbcUtil.TABLE_NAMESPACE : JdbcUtil.VIEW_NAMESPACE))); + tableOrView.put( + (table ? JdbcUtil.TABLE_NAME : JdbcUtil.VIEW_NAME), + rs.getString((table ? JdbcUtil.TABLE_NAME : JdbcUtil.VIEW_NAME))); + tableOrView.put( + BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)); + tableOrView.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP)); + } + + rs.close(); + } + + return tableOrView; + }); + } + + public static Map getTable( + JdbcClientPool connections, String catalogName, TableIdentifier tableIdentifier) + throws SQLException, InterruptedException { + return getTableOrView(true, connections, catalogName, tableIdentifier); + } + + public static Map getView( + JdbcClientPool connections, String catalogName, TableIdentifier viewIdentifier) + throws SQLException, InterruptedException { + return getTableOrView(false, connections, catalogName, viewIdentifier); + } + + private static int doCommitCreateTableOrView( + boolean table, + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier tableOrViewIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(doCommitCreateTableOrViewSql(table))) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(namespace)); + sql.setString(3, tableOrViewIdentifier.name()); + sql.setString(4, newMetadataLocation); + return sql.executeUpdate(); + } + }); + } + + public static int doCommitCreateTable( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier tableIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreateTableOrView( + true, connections, catalogName, namespace, tableIdentifier, newMetadataLocation); + } + + public static int doCommitCreateView( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier viewIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreateTableOrView( + false, connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + } + + static boolean viewExists( + String catalogName, JdbcClientPool connections, TableIdentifier viewIdentifier) { + if (exists( + connections, + JdbcUtil.getTableOrViewSql(false), + catalogName, + namespaceToString(viewIdentifier.namespace()), + viewIdentifier.name())) { + return true; + } + + return false; + } + + static boolean tableExists( + String catalogName, JdbcClientPool connections, TableIdentifier tableIdentifier) { + if (exists( + connections, + JdbcUtil.getTableOrViewSql(true), + catalogName, + namespaceToString(tableIdentifier.namespace()), + tableIdentifier.name())) { + return true; + } + + return false; + } + public static String updatePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder( @@ -314,6 +617,7 @@ public static String updatePropertiesStatement(int size) { for (int i = 0; i < size; i += 1) { sqlStatement.append(" WHEN " + NAMESPACE_PROPERTY_KEY + " = ? THEN ?"); } + sqlStatement.append( " END WHERE " + CATALOG_NAME @@ -360,7 +664,7 @@ static boolean namespaceExists( namespaceEquals.replace("\\", "\\\\").replace("_", "\\_").replace("%", "\\%") + ".%"; if (exists( connections, - JdbcUtil.GET_NAMESPACE_SQL, + JdbcUtil.getTableNamespaceSql(), catalogName, namespaceEquals, namespaceStartsWith)) { @@ -376,6 +680,15 @@ static boolean namespaceExists( return true; } + if (exists( + connections, + JdbcUtil.getViewNamespaceSql(), + catalogName, + namespaceEquals, + namespaceStartsWith)) { + return true; + } + return false; } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java new file mode 100644 index 000000000000..45e4ac9587be --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** JDBC implementation of Iceberg ViewOperations. */ +public class JdbcViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcViewOperations.class); + private final String catalogName; + private final TableIdentifier viewIdentifier; + private final FileIO fileIO; + private final JdbcClientPool connections; + private final Map catalogProperties; + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map catalogProperties) { + this.catalogName = catalogName; + this.viewIdentifier = viewIdentifier; + this.fileIO = fileIO; + this.connections = dbConnPool; + this.catalogProperties = catalogProperties; + } + + @Override + protected void doRefresh() { + Map view; + + try { + view = JdbcUtil.getView(connections, catalogName, viewIdentifier); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during refresh"); + } catch (SQLException e) { + // SQL exception happened when getting view from catalog + throw new UncheckedSQLException( + e, "Failed to get view %s from catalog %s", viewIdentifier, catalogName); + } + + if (view.isEmpty()) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } else { + this.disableRefresh(); + return; + } + } + + String newMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + Preconditions.checkState( + newMetadataLocation != null, "Invalid view %s: metadata location is null", viewIdentifier); + refreshFromMetadataLocation(newMetadataLocation); + } + + @Override + protected void doCommit(ViewMetadata base, ViewMetadata metadata) { + boolean newView = base == null; + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + try { + Map view = JdbcUtil.getView(connections, catalogName, viewIdentifier); + + if (base != null) { + validateMetadataLocation(view, base); + String oldMetadataLocation = base.metadataFileLocation(); + // Start atomic update + LOG.debug("Committing existing view: {}", viewName()); + updateView(newMetadataLocation, oldMetadataLocation); + } else { + // view not exists create it + LOG.debug("Committing new view: {}", viewName()); + createView(newMetadataLocation); + } + + } catch (SQLIntegrityConstraintViolationException e) { + if (currentMetadataLocation() == null) { + throw new AlreadyExistsException(e, "View already exists: %s", viewIdentifier); + } else { + throw new UncheckedSQLException(e, "View already exists: %s", viewIdentifier); + } + + } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + + throw new UncheckedSQLException(e, "Unknown failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during commit"); + } + } + + @Override + protected String viewName() { + return viewIdentifier.toString(); + } + + @Override + protected FileIO io() { + return fileIO; + } + + private void validateMetadataLocation(Map view, ViewMetadata base) { + String catalogMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s: metadata location %s has changed from %s", + viewIdentifier, baseMetadataLocation, catalogMetadataLocation); + } + } + + private void updateView(String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + JdbcUtil.updateView( + connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to update view %s from catalog %s", viewIdentifier, catalogName); + } + } + + private void createView(String newMetadataLocation) throws SQLException, InterruptedException { + Namespace namespace = viewIdentifier.namespace(); + if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) + && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + throw new NoSuchNamespaceException( + "Cannot create view %s in catalog %s. Namespace %s does not exist", + viewIdentifier, catalogName, namespace); + } + + if (JdbcUtil.tableExists(catalogName, connections, viewIdentifier)) { + throw new AlreadyExistsException("Table with same name already exists: %s", viewIdentifier); + } + + int insertRecord = + JdbcUtil.doCommitCreateView( + connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + + if (insertRecord == 1) { + LOG.debug("Successfully committed to new view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to create view %s in catalog %s", viewIdentifier, catalogName); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java index 0dec6eb83ebb..a26873573f31 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -26,6 +26,88 @@ public class TestJdbcUtil { + @Test + public void testGetTableOrViewSql() { + String tableSql = JdbcUtil.getTableOrViewSql(true); + Assertions.assertThat(tableSql) + .isEqualTo( + "SELECT * FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + String viewSql = JdbcUtil.getTableOrViewSql(false); + Assertions.assertThat(viewSql) + .isEqualTo( + "SELECT * FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + + @Test + public void testGetTableOrViewNamespaceSql() { + String tableSql = JdbcUtil.getTableNamespaceSql(); + Assertions.assertThat(tableSql) + .isEqualTo( + "SELECT table_namespace FROM iceberg_tables WHERE catalog_name = ? AND (table_namespace = ? OR table_namespace LIKE ? ESCAPE '\\') LIMIT 1"); + String viewSql = JdbcUtil.getViewNamespaceSql(); + Assertions.assertThat(viewSql) + .isEqualTo( + "SELECT view_namespace FROM iceberg_views WHERE catalog_name = ? AND (view_namespace = ? OR view_namespace LIKE ? ESCAPE '\\') LIMIT 1"); + } + + @Test + public void testListTableOrViewNamespacesSql() { + String tableSql = JdbcUtil.listTableNamespacesSql(); + Assertions.assertThat(tableSql) + .isEqualTo( + "SELECT DISTINCT table_namespace FROM iceberg_tables WHERE catalog_name = ? AND table_namespace LIKE ?"); + String viewSql = JdbcUtil.listViewNamespacesSql(); + Assertions.assertThat(viewSql) + .isEqualTo( + "SELECT DISTINCT view_namespace FROM iceberg_views WHERE catalog_name = ? AND view_namespace LIKE ?"); + } + + @Test + public void testCreateTableOrViewSql() { + String tableSql = JdbcUtil.createCatalogTableSql(); + Assertions.assertThat(tableSql) + .isEqualTo( + "CREATE TABLE iceberg_tables(catalog_name VARCHAR(255) NOT NULL,table_namespace VARCHAR(255) NOT NULL,table_name VARCHAR(255) NOT NULL,metadata_location VARCHAR(1000),previous_metadata_location VARCHAR(1000),PRIMARY KEY (catalog_name, table_namespace, table_name))"); + String viewSql = JdbcUtil.createCatalogViewSql(); + Assertions.assertThat(viewSql) + .isEqualTo( + "CREATE TABLE iceberg_views(catalog_name VARCHAR(255) NOT NULL,view_namespace VARCHAR(255) NOT NULL,view_name VARCHAR(255) NOT NULL,metadata_location VARCHAR(1000),previous_metadata_location VARCHAR(1000),PRIMARY KEY (catalog_name, view_namespace, view_name))"); + } + + @Test + public void testListTablesOrViewsSql() { + String tableSql = JdbcUtil.listTablesSql(); + Assertions.assertThat(tableSql) + .isEqualTo("SELECT * FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ?"); + String viewSql = JdbcUtil.listViewsSql(); + Assertions.assertThat(viewSql) + .isEqualTo("SELECT * FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ?"); + } + + @Test + public void testRenameTableOrViewSql() { + String tableSql = JdbcUtil.renameTableSql(); + Assertions.assertThat(tableSql) + .isEqualTo( + "UPDATE iceberg_tables SET table_namespace = ?, table_name = ? WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + String viewSql = JdbcUtil.renameViewSql(); + Assertions.assertThat(viewSql) + .isEqualTo( + "UPDATE iceberg_views SET view_namespace = ?, view_name = ? WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + + @Test + public void testDropTableOrViewSql() { + String tableSql = JdbcUtil.dropTableSql(); + Assertions.assertThat(tableSql) + .isEqualTo( + "DELETE FROM iceberg_tables WHERE catalog_name = ? AND table_namespace = ? AND table_name = ?"); + String viewSql = JdbcUtil.dropViewSql(); + Assertions.assertThat(viewSql) + .isEqualTo( + "DELETE FROM iceberg_views WHERE catalog_name = ? AND view_namespace = ? AND view_name = ?"); + } + @Test public void testFilterAndRemovePrefix() { Map input = Maps.newHashMap(); diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java new file mode 100644 index 000000000000..73e3a8791345 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.ViewCatalogTests; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +public class TestJdbcViewCatalog extends ViewCatalogTests { + + static Configuration conf = new Configuration(); + private static JdbcCatalog catalog; + private static String warehouseLocation; + + @TempDir java.nio.file.Path tableDir; + + @BeforeEach + public void before() { + Map properties = Maps.newHashMap(); + properties.put( + CatalogProperties.URI, + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + warehouseLocation = this.tableDir.toAbsolutePath().toString(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + + catalog = new JdbcCatalog(); + catalog.setConf(conf); + catalog.initialize("testCatalog", properties); + } + + @Override + protected JdbcCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +}