Skip to content

Commit

Permalink
Core: Add view support for JDBC catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Jan 31, 2024
1 parent 0536ff3 commit b42ce7b
Show file tree
Hide file tree
Showing 7 changed files with 633 additions and 134 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public interface ClientPool<C, E extends Exception> {
interface Action<R, C, E extends Exception> {
R run(C client) throws E;
R run(C client) throws E, InterruptedException;
}

<R> R run(Action<R, C, E> action) throws E, InterruptedException;
Expand Down
155 changes: 135 additions & 20 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
Expand All @@ -49,6 +48,7 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -60,10 +60,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.ViewOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcCatalog extends BaseMetastoreCatalog
public class JdbcCatalog extends BaseMetastoreViewCatalog
implements Configurable<Object>, SupportsNamespaces {

public static final String PROPERTY_PREFIX = "jdbc.";
Expand All @@ -80,19 +82,22 @@ public class JdbcCatalog extends BaseMetastoreCatalog
private final Function<Map<String, String>, FileIO> ioBuilder;
private final Function<Map<String, String>, JdbcClientPool> clientPoolBuilder;
private final boolean initializeCatalogTables;
private final boolean updateCatalogTables;
private CloseableGroup closeableGroup;

public JdbcCatalog() {
this(null, null, true);
this(null, null, true, false);
}

public JdbcCatalog(
Function<Map<String, String>, FileIO> ioBuilder,
Function<Map<String, String>, JdbcClientPool> clientPoolBuilder,
boolean initializeCatalogTables) {
boolean initializeCatalogTables,
boolean updateCatalogTables) {
this.ioBuilder = ioBuilder;
this.clientPoolBuilder = clientPoolBuilder;
this.initializeCatalogTables = initializeCatalogTables;
this.updateCatalogTables = updateCatalogTables;
}

@Override
Expand Down Expand Up @@ -134,14 +139,15 @@ public void initialize(String name, Map<String, String> properties) {
initializeCatalogTables();
}
} catch (SQLTimeoutException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Query timed out");
throw new UncheckedSQLException(e, "Cannot initialize/update JDBC catalog: Query timed out");
} catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Connection failed");
throw new UncheckedSQLException(
e, "Cannot initialize/update JDBC catalog: Connection failed");
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog");
throw new UncheckedSQLException(e, "Cannot initialize/update JDBC catalog");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in call to initialize");
throw new UncheckedInterruptedException(e, "Interrupted in call to initialize/update");
}
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(metricsReporter());
Expand All @@ -158,14 +164,18 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
dbMeta.getTables(
null /* catalog name */,
null /* schemaPattern */,
JdbcUtil.CATALOG_TABLE_NAME /* tableNamePattern */,
JdbcUtil.CATALOG_TABLE_VIEW_NAME /* tableNamePattern */,
null /* types */);
if (tableExists.next()) {
return true;
if (updateCatalogTables) {
updateCatalogTables();
}
}

LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
LOG.debug(
"Creating table {} to store iceberg catalog tables",
JdbcUtil.CATALOG_TABLE_VIEW_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_SQL).execute();
});

connections.run(
Expand All @@ -185,7 +195,22 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
LOG.debug(
"Creating table {} to store iceberg catalog namespace properties",
JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute();
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL).execute();
});
}

private void updateCatalogTables() throws SQLException, InterruptedException {
LOG.trace("Updating database tables (if needed)");
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableColumns =
dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, null);
if (tableColumns.getString(JdbcUtil.TYPE) != null) {
LOG.debug("{} is already up to date", JdbcUtil.CATALOG_TABLE_VIEW_NAME);
return true;
}
return conn.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute();
});
}

Expand All @@ -195,6 +220,11 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
connections, io, catalogName, tableIdentifier, catalogProperties);
}

@Override
protected ViewOperations newViewOps(TableIdentifier viewIdentifier) {
return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier table) {
return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name());
Expand All @@ -217,10 +247,11 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {

int deletedRecords =
execute(
JdbcUtil.DROP_TABLE_SQL,
JdbcUtil.DROP_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name());
identifier.name(),
"TABLE");

if (deletedRecords == 0) {
LOG.info("Skipping drop, table does not exist: {}", identifier);
Expand All @@ -245,13 +276,18 @@ public List<TableIdentifier> listTables(Namespace namespace) {
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_TABLES_SQL,
JdbcUtil.LIST_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
JdbcUtil.namespaceToString(namespace),
"TABLE");
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
if (viewExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}

int updatedRecords =
execute(
err -> {
Expand All @@ -261,12 +297,13 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
throw new AlreadyExistsException("Table already exists: %s", to);
}
},
JdbcUtil.RENAME_TABLE_SQL,
JdbcUtil.RENAME_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name());
from.name(),
"TABLE");

if (updatedRecords == 1) {
LOG.info("Renamed table from {}, to {}", from, to);
Expand Down Expand Up @@ -315,7 +352,7 @@ public List<Namespace> listNamespaces() {
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL,
JdbcUtil.LIST_ALL_NAMESPACES_SQL,
catalogName));
namespaces.addAll(
fetch(
Expand Down Expand Up @@ -503,6 +540,84 @@ public boolean namespaceExists(Namespace namespace) {
return JdbcUtil.namespaceExists(catalogName, connections, namespace);
}

@Override
public boolean dropView(TableIdentifier identifier) {
int deletedRecords =
execute(
JdbcUtil.DROP_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name(),
"VIEW");

if (deletedRecords == 0) {
LOG.info("Skipping drop, view does not exist: {}", identifier);
return false;
}

LOG.info("Dropped view: {}", identifier);
return true;
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}

return fetch(
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace),
"VIEW");
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
if (!viewExists(from)) {
throw new NoSuchViewException("View does not exist");
}

if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace());
}

if (tableExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}

int updatedRecords =
execute(
err -> {
// SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException
if (err instanceof SQLIntegrityConstraintViolationException
|| (err.getMessage() != null && err.getMessage().contains("constraint failed"))) {
throw new AlreadyExistsException(
"Cannot rename %s to %s. View already exists", from, to);
}
},
JdbcUtil.RENAME_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name(),
"VIEW");

if (updatedRecords == 1) {
LOG.info("Renamed view from {}, to {}", from, to);
} else if (updatedRecords == 0) {
throw new NoSuchViewException("View does not exist: %s", from);
} else {
LOG.warn(
"Rename operation affected {} rows: the catalog view's primary key assumption has been violated",
updatedRecords);
}
}

private int execute(String sql, String... args) {
return execute(err -> {}, sql, args);
}
Expand Down
Loading

0 comments on commit b42ce7b

Please sign in to comment.