Skip to content

Commit

Permalink
Core: Add view support for JDBC catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Jan 25, 2024
1 parent fd1cf49 commit 6914e0a
Show file tree
Hide file tree
Showing 6 changed files with 622 additions and 152 deletions.
128 changes: 112 additions & 16 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
Expand All @@ -49,6 +48,7 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -60,10 +60,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.ViewOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcCatalog extends BaseMetastoreCatalog
public class JdbcCatalog extends BaseMetastoreViewCatalog
implements Configurable<Object>, SupportsNamespaces {

public static final String PROPERTY_PREFIX = "jdbc.";
Expand Down Expand Up @@ -158,14 +160,16 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
dbMeta.getTables(
null /* catalog name */,
null /* schemaPattern */,
JdbcUtil.CATALOG_TABLE_NAME /* tableNamePattern */,
JdbcUtil.CATALOG_TABLE_VIEW_NAME /* tableNamePattern */,
null /* types */);
if (tableExists.next()) {
return true;
}

LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
LOG.debug(
"Creating table {} to store iceberg catalog tables",
JdbcUtil.CATALOG_TABLE_VIEW_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_SQL).execute();
});

connections.run(
Expand All @@ -185,7 +189,7 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
LOG.debug(
"Creating table {} to store iceberg catalog namespace properties",
JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute();
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL).execute();
});
}

Expand All @@ -195,6 +199,11 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
connections, io, catalogName, tableIdentifier, catalogProperties);
}

@Override
protected ViewOperations newViewOps(TableIdentifier viewIdentifier) {
return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier table) {
return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name());
Expand All @@ -217,10 +226,11 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {

int deletedRecords =
execute(
JdbcUtil.DROP_TABLE_SQL,
JdbcUtil.DROP_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name());
identifier.name(),
"TABLE");

if (deletedRecords == 0) {
LOG.info("Skipping drop, table does not exist: {}", identifier);
Expand All @@ -244,14 +254,20 @@ public List<TableIdentifier> listTables(Namespace namespace) {
return fetch(
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_TABLES_SQL,
row.getString(JdbcUtil.TABLE_VIEW_NAMESPACE),
row.getString(JdbcUtil.TABLE_VIEW_NAME)),
JdbcUtil.LIST_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
JdbcUtil.namespaceToString(namespace),
"TABLE");
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
if (viewExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}

int updatedRecords =
execute(
err -> {
Expand All @@ -261,12 +277,13 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
throw new AlreadyExistsException("Table already exists: %s", to);
}
},
JdbcUtil.RENAME_TABLE_SQL,
JdbcUtil.RENAME_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name());
from.name(),
"TABLE");

if (updatedRecords == 1) {
LOG.info("Renamed table from {}, to {}", from, to);
Expand Down Expand Up @@ -314,8 +331,8 @@ public List<Namespace> listNamespaces() {
List<Namespace> namespaces = Lists.newArrayList();
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL,
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_VIEW_NAMESPACE)),
JdbcUtil.LIST_ALL_NAMESPACES_SQL,
catalogName));
namespaces.addAll(
fetch(
Expand Down Expand Up @@ -349,7 +366,7 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac
List<Namespace> namespaces = Lists.newArrayList();
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_VIEW_NAMESPACE)),
JdbcUtil.LIST_NAMESPACES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace) + "%"));
Expand Down Expand Up @@ -503,6 +520,85 @@ public boolean namespaceExists(Namespace namespace) {
return JdbcUtil.namespaceExists(catalogName, connections, namespace);
}

@Override
public boolean dropView(TableIdentifier identifier) {
int deletedRecords =
execute(
JdbcUtil.DROP_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name(),
"VIEW");

if (deletedRecords == 0) {
LOG.info("Skipping drop, view does not exist: {}", identifier);
return false;
}

LOG.info("Dropped view: {}", identifier);
return true;
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}

return fetch(
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_VIEW_NAMESPACE),
row.getString(JdbcUtil.TABLE_VIEW_NAME)),
JdbcUtil.LIST_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace),
"VIEW");
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
if (!viewExists(from)) {
throw new NoSuchViewException("View does not exist");
}

if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace());
}

if (tableExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}

int updatedRecords =
execute(
err -> {
// SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException
if (err instanceof SQLIntegrityConstraintViolationException
|| (err.getMessage() != null && err.getMessage().contains("constraint failed"))) {
throw new AlreadyExistsException(
"Cannot rename %s to %s. View already exists", from, to);
}
},
JdbcUtil.RENAME_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name(),
"VIEW");

if (updatedRecords == 1) {
LOG.info("Renamed view from {}, to {}", from, to);
} else if (updatedRecords == 0) {
throw new NoSuchViewException("View does not exist: %s", from);
} else {
LOG.warn(
"Rename operation affected {} rows: the catalog view's primary key assumption has been violated",
updatedRecords);
}
}

private int execute(String sql, String... args) {
return execute(err -> {}, sql, args);
}
Expand Down
69 changes: 11 additions & 58 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.iceberg.jdbc;

import java.sql.DataTruncation;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientConnectionException;
Expand All @@ -39,7 +37,6 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,7 +68,7 @@ public void doRefresh() {
Map<String, String> table;

try {
table = getTable();
table = JdbcUtil.get(true, connections, catalogName, tableIdentifier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted during refresh");
Expand Down Expand Up @@ -105,7 +102,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
try {
Map<String, String> table = getTable();
Map<String, String> table = JdbcUtil.get(true, connections, catalogName, tableIdentifier);

if (base != null) {
validateMetadataLocation(table, base);
Expand Down Expand Up @@ -140,6 +137,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
if (e.getMessage().contains("constraint failed")) {
throw new AlreadyExistsException("Table already exists: %s", tableIdentifier);
}

throw new UncheckedSQLException(e, "Unknown failure");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -150,20 +148,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
private void updateTable(String newMetadataLocation, String oldMetadataLocation)
throws SQLException, InterruptedException {
int updatedRecords =
connections.run(
conn -> {
try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.DO_COMMIT_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
sql.setString(2, oldMetadataLocation);
// WHERE
sql.setString(3, catalogName);
sql.setString(4, JdbcUtil.namespaceToString(tableIdentifier.namespace()));
sql.setString(5, tableIdentifier.name());
sql.setString(6, oldMetadataLocation);
return sql.executeUpdate();
}
});
JdbcUtil.updateTable(
connections, catalogName, tableIdentifier, newMetadataLocation, oldMetadataLocation);

if (updatedRecords == 1) {
LOG.debug("Successfully committed to existing table: {}", tableIdentifier);
Expand All @@ -182,18 +168,13 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr
tableIdentifier, catalogName, namespace);
}

if (JdbcUtil.viewExists(catalogName, connections, tableIdentifier)) {
throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier);
}

int insertRecord =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(JdbcUtil.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogName);
sql.setString(2, JdbcUtil.namespaceToString(namespace));
sql.setString(3, tableIdentifier.name());
sql.setString(4, newMetadataLocation);
return sql.executeUpdate();
}
});
JdbcUtil.doCommitCreateTable(
connections, catalogName, namespace, tableIdentifier, newMetadataLocation);

if (insertRecord == 1) {
LOG.debug("Successfully committed to new table: {}", tableIdentifier);
Expand Down Expand Up @@ -223,32 +204,4 @@ public FileIO io() {
protected String tableName() {
return tableIdentifier.toString();
}

private Map<String, String> getTable()
throws UncheckedSQLException, SQLException, InterruptedException {
return connections.run(
conn -> {
Map<String, String> table = Maps.newHashMap();

try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.GET_TABLE_SQL)) {
sql.setString(1, catalogName);
sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace()));
sql.setString(3, tableIdentifier.name());
ResultSet rs = sql.executeQuery();

if (rs.next()) {
table.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME));
table.put(JdbcUtil.TABLE_NAMESPACE, rs.getString(JdbcUtil.TABLE_NAMESPACE));
table.put(JdbcUtil.TABLE_NAME, rs.getString(JdbcUtil.TABLE_NAME));
table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP));
table.put(
PREVIOUS_METADATA_LOCATION_PROP, rs.getString(PREVIOUS_METADATA_LOCATION_PROP));
}

rs.close();
}

return table;
});
}
}
Loading

0 comments on commit 6914e0a

Please sign in to comment.