From 9865ec3f7d98ca7c3a4f4a172471708e7c33d097 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Thu, 11 Jan 2024 18:16:12 +0100 Subject: [PATCH] Core: Add view support for JDBC catalog --- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 160 +++++++++-- .../iceberg/jdbc/JdbcTableOperations.java | 70 +---- .../org/apache/iceberg/jdbc/JdbcUtil.java | 267 ++++++++++++++---- .../iceberg/jdbc/JdbcViewOperations.java | 202 +++++++++++++ .../org/apache/iceberg/jdbc/TestJdbcUtil.java | 18 +- .../iceberg/jdbc/TestJdbcViewCatalog.java | 66 +++++ 6 files changed, 651 insertions(+), 132 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java create mode 100644 core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 0bab6ade4c85..abc8ed185adf 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -37,7 +38,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; @@ -49,6 +49,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; @@ -60,10 +61,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JdbcCatalog extends BaseMetastoreCatalog +public class JdbcCatalog extends BaseMetastoreViewCatalog implements Configurable, SupportsNamespaces { public static final String PROPERTY_PREFIX = "jdbc."; @@ -80,10 +83,11 @@ public class JdbcCatalog extends BaseMetastoreCatalog private final Function, FileIO> ioBuilder; private final Function, JdbcClientPool> clientPoolBuilder; private final boolean initializeCatalogTables; + private final boolean updateCatalogTables; private CloseableGroup closeableGroup; public JdbcCatalog() { - this(null, null, true); + this(null, null, true, false); } public JdbcCatalog( @@ -93,6 +97,18 @@ public JdbcCatalog( this.ioBuilder = ioBuilder; this.clientPoolBuilder = clientPoolBuilder; this.initializeCatalogTables = initializeCatalogTables; + this.updateCatalogTables = false; + } + + public JdbcCatalog( + Function, FileIO> ioBuilder, + Function, JdbcClientPool> clientPoolBuilder, + boolean initializeCatalogTables, + boolean updateCatalogTables) { + this.ioBuilder = ioBuilder; + this.clientPoolBuilder = clientPoolBuilder; + this.initializeCatalogTables = initializeCatalogTables; + this.updateCatalogTables = updateCatalogTables; } @Override @@ -134,14 +150,15 @@ public void initialize(String name, Map properties) { initializeCatalogTables(); } } catch (SQLTimeoutException e) { - throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Query timed out"); + throw new UncheckedSQLException(e, "Cannot initialize/update JDBC catalog: Query timed out"); } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { - throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Connection failed"); + throw new UncheckedSQLException( + e, "Cannot initialize/update JDBC catalog: Connection failed"); } catch (SQLException e) { - throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog"); + throw new UncheckedSQLException(e, "Cannot initialize/update JDBC catalog"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new UncheckedInterruptedException(e, "Interrupted in call to initialize"); + throw new UncheckedInterruptedException(e, "Interrupted in call to initialize/update"); } this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(metricsReporter()); @@ -158,14 +175,18 @@ private void initializeCatalogTables() throws InterruptedException, SQLException dbMeta.getTables( null /* catalog name */, null /* schemaPattern */, - JdbcUtil.CATALOG_TABLE_NAME /* tableNamePattern */, + JdbcUtil.CATALOG_TABLE_VIEW_NAME /* tableNamePattern */, null /* types */); if (tableExists.next()) { - return true; + if (updateCatalogTables) { + updateCatalogTables(conn); + } } - LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute(); + LOG.debug( + "Creating table {} to store iceberg catalog tables", + JdbcUtil.CATALOG_TABLE_VIEW_NAME); + return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_SQL).execute(); }); connections.run( @@ -185,16 +206,32 @@ private void initializeCatalogTables() throws InterruptedException, SQLException LOG.debug( "Creating table {} to store iceberg catalog namespace properties", JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute(); + return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL).execute(); }); } + private void updateCatalogTables(Connection connection) throws SQLException { + LOG.trace("Updating database tables (if needed)"); + DatabaseMetaData dbMeta = connection.getMetaData(); + ResultSet tableColumns = dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, null); + if (tableColumns.getString(JdbcUtil.TYPE) != null) { + LOG.debug("{} is already up to date", JdbcUtil.CATALOG_TABLE_VIEW_NAME); + return; + } + connection.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute(); + } + @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { return new JdbcTableOperations( connections, io, catalogName, tableIdentifier, catalogProperties); } + @Override + protected ViewOperations newViewOps(TableIdentifier viewIdentifier) { + return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties); + } + @Override protected String defaultWarehouseLocation(TableIdentifier table) { return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name()); @@ -217,10 +254,11 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { int deletedRecords = execute( - JdbcUtil.DROP_TABLE_SQL, + JdbcUtil.DROP_SQL, catalogName, JdbcUtil.namespaceToString(identifier.namespace()), - identifier.name()); + identifier.name(), + "TABLE"); if (deletedRecords == 0) { LOG.info("Skipping drop, table does not exist: {}", identifier); @@ -245,13 +283,18 @@ public List listTables(Namespace namespace) { row -> JdbcUtil.stringToTableIdentifier( row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), - JdbcUtil.LIST_TABLES_SQL, + JdbcUtil.LIST_SQL, catalogName, - JdbcUtil.namespaceToString(namespace)); + JdbcUtil.namespaceToString(namespace), + "TABLE"); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { + if (viewExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to); + } + int updatedRecords = execute( err -> { @@ -261,12 +304,13 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { throw new AlreadyExistsException("Table already exists: %s", to); } }, - JdbcUtil.RENAME_TABLE_SQL, + JdbcUtil.RENAME_SQL, JdbcUtil.namespaceToString(to.namespace()), to.name(), catalogName, JdbcUtil.namespaceToString(from.namespace()), - from.name()); + from.name(), + "TABLE"); if (updatedRecords == 1) { LOG.info("Renamed table from {}, to {}", from, to); @@ -315,7 +359,7 @@ public List listNamespaces() { namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL, + JdbcUtil.LIST_ALL_NAMESPACES_SQL, catalogName)); namespaces.addAll( fetch( @@ -503,6 +547,84 @@ public boolean namespaceExists(Namespace namespace) { return JdbcUtil.namespaceExists(catalogName, connections, namespace); } + @Override + public boolean dropView(TableIdentifier identifier) { + int deletedRecords = + execute( + JdbcUtil.DROP_SQL, + catalogName, + JdbcUtil.namespaceToString(identifier.namespace()), + identifier.name(), + "VIEW"); + + if (deletedRecords == 0) { + LOG.info("Skipping drop, view does not exist: {}", identifier); + return false; + } + + LOG.info("Dropped view: {}", identifier); + return true; + } + + @Override + public List listViews(Namespace namespace) { + if (!namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + return fetch( + row -> + JdbcUtil.stringToTableIdentifier( + row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), + JdbcUtil.LIST_SQL, + catalogName, + JdbcUtil.namespaceToString(namespace), + "VIEW"); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + if (!viewExists(from)) { + throw new NoSuchViewException("View does not exist"); + } + + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace()); + } + + if (tableExists(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); + } + + int updatedRecords = + execute( + err -> { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null && err.getMessage().contains("constraint failed"))) { + throw new AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + }, + JdbcUtil.RENAME_SQL, + JdbcUtil.namespaceToString(to.namespace()), + to.name(), + catalogName, + JdbcUtil.namespaceToString(from.namespace()), + from.name(), + "VIEW"); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", from, to); + } else if (updatedRecords == 0) { + throw new NoSuchViewException("View does not exist: %s", from); + } else { + LOG.warn( + "Rename operation affected {} rows: the catalog view's primary key assumption has been violated", + updatedRecords); + } + } + private int execute(String sql, String... args) { return execute(err -> {}, sql, args); } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index 6a7d594dd9f6..22ea8fe84343 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -19,8 +19,6 @@ package org.apache.iceberg.jdbc; import java.sql.DataTruncation; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLNonTransientConnectionException; @@ -39,7 +37,6 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +68,7 @@ public void doRefresh() { Map table; try { - table = getTable(); + table = JdbcUtil.tableOrView(true, connections, catalogName, tableIdentifier); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted during refresh"); @@ -105,7 +102,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); try { - Map table = getTable(); + Map table = + JdbcUtil.tableOrView(true, connections, catalogName, tableIdentifier); if (base != null) { validateMetadataLocation(table, base); @@ -140,6 +138,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { if (e.getMessage().contains("constraint failed")) { throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); } + throw new UncheckedSQLException(e, "Unknown failure"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -150,20 +149,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { private void updateTable(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = - connections.run( - conn -> { - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.DO_COMMIT_SQL)) { - // UPDATE - sql.setString(1, newMetadataLocation); - sql.setString(2, oldMetadataLocation); - // WHERE - sql.setString(3, catalogName); - sql.setString(4, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(5, tableIdentifier.name()); - sql.setString(6, oldMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.updateTable( + connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); if (updatedRecords == 1) { LOG.debug("Successfully committed to existing table: {}", tableIdentifier); @@ -182,18 +169,13 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr tableIdentifier, catalogName, namespace); } + if (JdbcUtil.viewExists(catalogName, connections, tableIdentifier)) { + throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier); + } + int insertRecord = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement(JdbcUtil.DO_COMMIT_CREATE_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(namespace)); - sql.setString(3, tableIdentifier.name()); - sql.setString(4, newMetadataLocation); - return sql.executeUpdate(); - } - }); + JdbcUtil.doCommitCreateTable( + connections, catalogName, namespace, tableIdentifier, newMetadataLocation); if (insertRecord == 1) { LOG.debug("Successfully committed to new table: {}", tableIdentifier); @@ -223,32 +205,4 @@ public FileIO io() { protected String tableName() { return tableIdentifier.toString(); } - - private Map getTable() - throws UncheckedSQLException, SQLException, InterruptedException { - return connections.run( - conn -> { - Map table = Maps.newHashMap(); - - try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.GET_TABLE_SQL)) { - sql.setString(1, catalogName); - sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace())); - sql.setString(3, tableIdentifier.name()); - ResultSet rs = sql.executeQuery(); - - if (rs.next()) { - table.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); - table.put(JdbcUtil.TABLE_NAMESPACE, rs.getString(JdbcUtil.TABLE_NAMESPACE)); - table.put(JdbcUtil.TABLE_NAME, rs.getString(JdbcUtil.TABLE_NAME)); - table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); - table.put( - PREVIOUS_METADATA_LOCATION_PROP, rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); - } - - rs.close(); - } - - return table; - }); - } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index 4a515d1329ed..5c77e7c91e38 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -25,31 +25,34 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; final class JdbcUtil { // property to control strict-mode (aka check if namespace exists when creating a table) static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; - // Catalog Table - static final String CATALOG_TABLE_NAME = "iceberg_tables"; + // Catalog Table & View + static final String CATALOG_TABLE_VIEW_NAME = "iceberg_tables"; static final String CATALOG_NAME = "catalog_name"; - static final String TABLE_NAMESPACE = "table_namespace"; static final String TABLE_NAME = "table_name"; + static final String TABLE_NAMESPACE = "table_namespace"; + static final String TYPE = "type"; - static final String DO_COMMIT_SQL = + private static final String DO_COMMIT_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME + " = ? AND " @@ -58,10 +61,12 @@ final class JdbcUtil { + TABLE_NAME + " = ? AND " + JdbcTableOperations.METADATA_LOCATION_PROP + + " = ? AND " + + TYPE + " = ?"; - static final String CREATE_CATALOG_TABLE = + static final String CREATE_CATALOG_SQL = "CREATE TABLE " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + "(" + CATALOG_NAME + " VARCHAR(255) NOT NULL," @@ -73,6 +78,8 @@ final class JdbcUtil { + " VARCHAR(1000)," + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + " VARCHAR(1000)," + + TYPE + + " VARCHAR(5)," + "PRIMARY KEY (" + CATALOG_NAME + ", " @@ -81,85 +88,100 @@ final class JdbcUtil { + TABLE_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + static final String UPDATE_CATALOG_SQL = + "ALTER TABLE " + CATALOG_TABLE_VIEW_NAME + " ADD COLUMN " + TYPE + " VARCHAR(5)"; + + private static final String GET_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND " + + TYPE + + " = ?"; + private static final String GET_TABLE_SQL = + GET_SQL + + " OR " + + TYPE + + " IS NULL"; // type is null when the SQL database has been updated from previous version + static final String LIST_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + + " = ? AND " + + TYPE + " = ?"; - static final String RENAME_TABLE_SQL = + static final String RENAME_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " SET " + TABLE_NAMESPACE - + " = ? , " + + " = ?, " + TABLE_NAME - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String DROP_TABLE_SQL = + + " = ? AND " + + TYPE + + " = ?"; + static final String DROP_SQL = "DELETE FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE - + " = ? AND " + + " = ? AND " + TABLE_NAME - + " = ? "; - static final String GET_NAMESPACE_SQL = + + " = ? AND " + + TYPE + + " = ?"; + private static final String GET_NAMESPACE_SQL = "SELECT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " - + " ( " + + " (" + TABLE_NAMESPACE + " = ? OR " + TABLE_NAMESPACE - + " LIKE ? ESCAPE '\\' " - + " ) " + + " LIKE ? ESCAPE '\\')" + " LIMIT 1"; static final String LIST_NAMESPACES_SQL = "SELECT DISTINCT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?"; - static final String LIST_ALL_TABLE_NAMESPACES_SQL = + static final String LIST_ALL_NAMESPACES_SQL = "SELECT DISTINCT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ?"; - static final String DO_COMMIT_CREATE_TABLE_SQL = + private static final String DO_COMMIT_CREATE_SQL = "INSERT INTO " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " (" + CATALOG_NAME + ", " @@ -170,8 +192,10 @@ final class JdbcUtil { + JdbcTableOperations.METADATA_LOCATION_PROP + ", " + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ", " + + TYPE + ") " - + " VALUES (?,?,?,?,null)"; + + " VALUES (?,?,?,?,null,?)"; // Catalog Namespace Properties static final String NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"; @@ -179,7 +203,7 @@ final class JdbcUtil { static final String NAMESPACE_PROPERTY_KEY = "property_key"; static final String NAMESPACE_PROPERTY_VALUE = "property_value"; - static final String CREATE_NAMESPACE_PROPERTIES_TABLE = + static final String CREATE_NAMESPACE_PROPERTIES_TABLE_SQL = "CREATE TABLE " + NAMESPACE_PROPERTIES_TABLE_NAME + "(" @@ -278,20 +302,20 @@ final class JdbcUtil { private JdbcUtil() {} - public static Namespace stringToNamespace(String namespace) { + static Namespace stringToNamespace(String namespace) { Preconditions.checkArgument(namespace != null, "Invalid namespace %s", namespace); return Namespace.of(Iterables.toArray(SPLITTER_DOT.split(namespace), String.class)); } - public static String namespaceToString(Namespace namespace) { + static String namespaceToString(Namespace namespace) { return JOINER_DOT.join(namespace.levels()); } - public static TableIdentifier stringToTableIdentifier(String tableNamespace, String tableName) { + static TableIdentifier stringToTableIdentifier(String tableNamespace, String tableName) { return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName); } - public static Properties filterAndRemovePrefix(Map properties, String prefix) { + static Properties filterAndRemovePrefix(Map properties, String prefix) { Properties result = new Properties(); properties.forEach( (key, value) -> { @@ -303,7 +327,152 @@ public static Properties filterAndRemovePrefix(Map properties, S return result; } - public static String updatePropertiesStatement(int size) { + private static int update( + boolean isTable, + JdbcClientPool connections, + String catalogName, + TableIdentifier identifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(DO_COMMIT_SQL)) { + // UPDATE + sql.setString(1, newMetadataLocation); + sql.setString(2, oldMetadataLocation); + // WHERE + sql.setString(3, catalogName); + sql.setString(4, namespaceToString(identifier.namespace())); + sql.setString(5, identifier.name()); + sql.setString(6, oldMetadataLocation); + sql.setString(7, isTable ? "TABLE" : "VIEW"); + return sql.executeUpdate(); + } + }); + } + + static int updateTable( + JdbcClientPool connections, + String catalogName, + TableIdentifier tableIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return update( + true, connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation); + } + + static int updateView( + JdbcClientPool connections, + String catalogName, + TableIdentifier viewIdentifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + return update( + false, connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + } + + static Map tableOrView( + boolean isTable, JdbcClientPool connections, String catalogName, TableIdentifier identifier) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + Map tableOrView = Maps.newHashMap(); + + try (PreparedStatement sql = conn.prepareStatement(isTable ? GET_TABLE_SQL : GET_SQL)) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(identifier.namespace())); + sql.setString(3, identifier.name()); + sql.setString(4, isTable ? "TABLE" : "VIEW"); + ResultSet rs = sql.executeQuery(); + + if (rs.next()) { + tableOrView.put(CATALOG_NAME, rs.getString(CATALOG_NAME)); + tableOrView.put(TABLE_NAMESPACE, rs.getString(TABLE_NAMESPACE)); + tableOrView.put(TABLE_NAME, rs.getString(TABLE_NAME)); + tableOrView.put( + BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)); + tableOrView.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + rs.getString(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP)); + } + + rs.close(); + } + + return tableOrView; + }); + } + + private static int doCommitCreate( + boolean isTable, + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier identifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(DO_COMMIT_CREATE_SQL)) { + sql.setString(1, catalogName); + sql.setString(2, namespaceToString(namespace)); + sql.setString(3, identifier.name()); + sql.setString(4, newMetadataLocation); + sql.setString(5, isTable ? "TABLE" : "VIEW"); + return sql.executeUpdate(); + } + }); + } + + static int doCommitCreateTable( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier tableIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreate( + true, connections, catalogName, namespace, tableIdentifier, newMetadataLocation); + } + + static int doCommitCreateView( + JdbcClientPool connections, + String catalogName, + Namespace namespace, + TableIdentifier viewIdentifier, + String newMetadataLocation) + throws SQLException, InterruptedException { + return doCommitCreate( + false, connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + } + + static boolean viewExists( + String catalogName, JdbcClientPool connections, TableIdentifier viewIdentifier) { + return exists( + connections, + GET_SQL, + catalogName, + namespaceToString(viewIdentifier.namespace()), + viewIdentifier.name(), + "VIEW"); + } + + static boolean tableExists( + String catalogName, JdbcClientPool connections, TableIdentifier tableIdentifier) { + return exists( + connections, + GET_TABLE_SQL, + catalogName, + namespaceToString(tableIdentifier.namespace()), + tableIdentifier.name(), + "TABLE"); + } + + static String updatePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder( "UPDATE " @@ -314,6 +483,7 @@ public static String updatePropertiesStatement(int size) { for (int i = 0; i < size; i += 1) { sqlStatement.append(" WHEN " + NAMESPACE_PROPERTY_KEY + " = ? THEN ?"); } + sqlStatement.append( " END WHERE " + CATALOG_NAME @@ -329,7 +499,7 @@ public static String updatePropertiesStatement(int size) { return sqlStatement.toString(); } - public static String insertPropertiesStatement(int size) { + static String insertPropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { @@ -342,7 +512,7 @@ public static String insertPropertiesStatement(int size) { return sqlStatement.toString(); } - public static String deletePropertiesStatement(Set properties) { + static String deletePropertiesStatement(Set properties) { StringBuilder sqlStatement = new StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL); String values = String.join(",", Collections.nCopies(properties.size(), String.valueOf('?'))); sqlStatement.append("(").append(values).append(")"); @@ -358,25 +528,16 @@ static boolean namespaceExists( // catalog.db can exists as: catalog.db.ns1 or catalog.db.ns1.ns2 String namespaceStartsWith = namespaceEquals.replace("\\", "\\\\").replace("_", "\\_").replace("%", "\\%") + ".%"; - if (exists( - connections, - JdbcUtil.GET_NAMESPACE_SQL, - catalogName, - namespaceEquals, - namespaceStartsWith)) { + if (exists(connections, GET_NAMESPACE_SQL, catalogName, namespaceEquals, namespaceStartsWith)) { return true; } - if (exists( + return exists( connections, JdbcUtil.GET_NAMESPACE_PROPERTIES_SQL, catalogName, namespaceEquals, - namespaceStartsWith)) { - return true; - } - - return false; + namespaceStartsWith); } @SuppressWarnings("checkstyle:NestedTryDepth") diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java new file mode 100644 index 000000000000..931f9fdb086a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.DataTruncation; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.SQLTimeoutException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLWarning; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** JDBC implementation of Iceberg ViewOperations. */ +public class JdbcViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcViewOperations.class); + private final String catalogName; + private final TableIdentifier viewIdentifier; + private final FileIO fileIO; + private final JdbcClientPool connections; + private final Map catalogProperties; + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map catalogProperties) { + this.catalogName = catalogName; + this.viewIdentifier = viewIdentifier; + this.fileIO = fileIO; + this.connections = dbConnPool; + this.catalogProperties = catalogProperties; + } + + @Override + protected void doRefresh() { + Map view; + + try { + view = JdbcUtil.tableOrView(false, connections, catalogName, viewIdentifier); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during refresh"); + } catch (SQLException e) { + // SQL exception happened when getting view from catalog + throw new UncheckedSQLException( + e, "Failed to get view %s from catalog %s", viewIdentifier, catalogName); + } + + if (view.isEmpty()) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } else { + this.disableRefresh(); + return; + } + } + + String newMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + Preconditions.checkState( + newMetadataLocation != null, "Invalid view %s: metadata location is null", viewIdentifier); + refreshFromMetadataLocation(newMetadataLocation); + } + + @Override + protected void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + try { + Map view = + JdbcUtil.tableOrView(false, connections, catalogName, viewIdentifier); + if (base != null) { + validateMetadataLocation(view, base); + String oldMetadataLocation = base.metadataFileLocation(); + // Start atomic update + LOG.debug("Committing existing view: {}", viewName()); + updateView(newMetadataLocation, oldMetadataLocation); + } else { + // view not exists create it + LOG.debug("Committing new view: {}", viewName()); + createView(newMetadataLocation); + } + + } catch (SQLIntegrityConstraintViolationException e) { + if (currentMetadataLocation() == null) { + throw new AlreadyExistsException(e, "View already exists: %s", viewIdentifier); + } else { + throw new UncheckedSQLException(e, "View already exists: %s", viewIdentifier); + } + + } catch (SQLTimeoutException e) { + throw new UncheckedSQLException(e, "Database Connection timeout"); + } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { + throw new UncheckedSQLException(e, "Database Connection failed"); + } catch (DataTruncation e) { + throw new UncheckedSQLException(e, "Database data truncation error"); + } catch (SQLWarning e) { + throw new UncheckedSQLException(e, "Database warning"); + } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + + throw new UncheckedSQLException(e, "Unknown failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during commit"); + } + } + + @Override + protected String viewName() { + return viewIdentifier.toString(); + } + + @Override + protected FileIO io() { + return fileIO; + } + + private void validateMetadataLocation(Map view, ViewMetadata base) { + String catalogMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s: metadata location %s has changed from %s", + viewIdentifier, baseMetadataLocation, catalogMetadataLocation); + } + } + + private void updateView(String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + JdbcUtil.updateView( + connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to update view %s from catalog %s", viewIdentifier, catalogName); + } + } + + private void createView(String newMetadataLocation) throws SQLException, InterruptedException { + Namespace namespace = viewIdentifier.namespace(); + if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) + && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + throw new NoSuchNamespaceException( + "Cannot create view %s in catalog %s. Namespace %s does not exist", + viewIdentifier, catalogName, namespace); + } + + if (JdbcUtil.tableExists(catalogName, connections, viewIdentifier)) { + throw new AlreadyExistsException("Table with same name already exists: %s", viewIdentifier); + } + + int insertRecord = + JdbcUtil.doCommitCreateView( + connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + + if (insertRecord == 1) { + LOG.debug("Successfully committed to new view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to create view %s in catalog %s", viewIdentifier, catalogName); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java index 0dec6eb83ebb..b8c4deeb1455 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -18,14 +18,28 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.sqlite.SQLiteDataSource; public class TestJdbcUtil { + @Test + public void testUpdate() throws Exception { + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl("jdbc:sqlite:file::memory:?icebergDB"); + + try (Connection connection = dataSource.getConnection()) { + // create "old style" SQL schema + + } + } + @Test public void testFilterAndRemovePrefix() { Map input = Maps.newHashMap(); @@ -42,6 +56,6 @@ public void testFilterAndRemovePrefix() { Properties actual = JdbcUtil.filterAndRemovePrefix(input, "jdbc."); - Assertions.assertThat(expected).isEqualTo(actual); + assertThat(expected).isEqualTo(actual); } } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java new file mode 100644 index 000000000000..c7bdb158cdf2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.ViewCatalogTests; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +public class TestJdbcViewCatalog extends ViewCatalogTests { + + private JdbcCatalog catalog; + + @TempDir private java.nio.file.Path tableDir; + + @BeforeEach + public void before() { + Map properties = Maps.newHashMap(); + properties.put( + CatalogProperties.URI, + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString()); + + catalog = new JdbcCatalog(); + catalog.setConf(new Configuration()); + catalog.initialize("testCatalog", properties); + } + + @Override + protected JdbcCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +}