diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java new file mode 100644 index 000000000000..bc5da2aee280 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */ +public class NoSuchIcebergViewException extends NoSuchViewException { + @FormatMethod + public NoSuchIcebergViewException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new NoSuchIcebergViewException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index a683533473be..972ef4ae356d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -282,7 +282,7 @@ private Map tableOverrideProperties() { } } - protected static String fullTableName(String catalogName, TableIdentifier identifier) { + public static String fullTableName(String catalogName, TableIdentifier identifier) { StringBuilder sb = new StringBuilder(); if (catalogName.contains("/") || catalogName.contains(":")) { diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 2fccef5a0ab3..8beb8acf9755 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -18,15 +18,6 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; - import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -44,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.LocationUtil; -import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.MetastoreOperationsUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +45,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations { public static final String TABLE_TYPE_PROP = "table_type"; public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; + public static final String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view"; public static final String METADATA_LOCATION_PROP = "metadata_location"; public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; @@ -291,7 +283,7 @@ public long newSnapshotId() { }; } - protected enum CommitStatus { + public enum CommitStatus { FAILURE, SUCCESS, UNKNOWN @@ -309,65 +301,19 @@ protected enum CommitStatus { * @return Commit Status of Success, Failure or Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - int maxAttempts = - PropertyUtil.propertyAsInt( - config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); - long minWaitMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_MIN_WAIT_MS, - COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT); - long maxWaitMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_MAX_WAIT_MS, - COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT); - long totalRetryMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, - COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); - - AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); - - Tasks.foreach(newMetadataLocation) - .retry(maxAttempts) - .suppressFailureWhenFinished() - .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) - .onFailure( - (location, checkException) -> - LOG.error("Cannot check if commit to {} exists.", tableName(), checkException)) - .run( - location -> { - TableMetadata metadata = refresh(); - String currentMetadataFileLocation = metadata.metadataFileLocation(); - boolean commitSuccess = - currentMetadataFileLocation.equals(newMetadataLocation) - || metadata.previousFiles().stream() - .anyMatch(log -> log.file().equals(newMetadataLocation)); - if (commitSuccess) { - LOG.info( - "Commit status check: Commit to {} of {} succeeded", - tableName(), - newMetadataLocation); - status.set(CommitStatus.SUCCESS); - } else { - LOG.warn( - "Commit status check: Commit to {} of {} unknown, new metadata location is not current " - + "or in history", - tableName(), - newMetadataLocation); - } - }); - - if (status.get() == CommitStatus.UNKNOWN) { - LOG.error( - "Cannot determine commit state to {}. Failed during checking {} times. " - + "Treating commit state as unknown.", - tableName(), - maxAttempts); - } - return status.get(); + return MetastoreOperationsUtil.checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + this::calculateCommitStatusWithUpdatedLocation); + } + + protected boolean calculateCommitStatusWithUpdatedLocation(String newMetadataLocation) { + TableMetadata metadata = refresh(); + String currentMetadataFileLocation = metadata.metadataFileLocation(); + return currentMetadataFileLocation.equals(newMetadataLocation) + || metadata.previousFiles().stream() + .anyMatch(log -> log.file().equals(newMetadataLocation)); } private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { diff --git a/core/src/main/java/org/apache/iceberg/util/MetastoreOperationsUtil.java b/core/src/main/java/org/apache/iceberg/util/MetastoreOperationsUtil.java new file mode 100644 index 000000000000..62232237ed9c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/MetastoreOperationsUtil.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetastoreOperationsUtil { + private static final Logger LOG = LoggerFactory.getLogger(MetastoreOperationsUtil.class); + + private MetastoreOperationsUtil() {} + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. + * + * @param tableName full name of the table + * @param newMetadataLocation the path of the new commit file + * @param properties properties for retry + * @param commitStatusSupplier calculate commit status with updated location + * @return Commit Status of Success, Failure or Unknown + */ + public static BaseMetastoreTableOperations.CommitStatus checkCommitStatus( + String tableName, + String newMetadataLocation, + Map properties, + Function commitStatusSupplier) { + int maxAttempts = + PropertyUtil.propertyAsInt( + properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); + long minWaitMs = + PropertyUtil.propertyAsLong( + properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT); + long maxWaitMs = + PropertyUtil.propertyAsLong( + properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT); + long totalRetryMs = + PropertyUtil.propertyAsLong( + properties, + COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, + COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); + + AtomicReference status = + new AtomicReference<>(BaseMetastoreTableOperations.CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(maxAttempts) + .suppressFailureWhenFinished() + .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) + .onFailure( + (location, checkException) -> + LOG.error("Cannot check if commit to {} exists.", tableName, checkException)) + .run( + location -> { + boolean commitSuccess = commitStatusSupplier.apply(newMetadataLocation); + + if (commitSuccess) { + LOG.info( + "Commit status check: Commit to {} of {} succeeded", + tableName, + newMetadataLocation); + status.set(BaseMetastoreTableOperations.CommitStatus.SUCCESS); + } else { + LOG.warn( + "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + + "or in history", + tableName, + newMetadataLocation); + } + }); + + if (status.get() == BaseMetastoreTableOperations.CommitStatus.UNKNOWN) { + LOG.error( + "Cannot determine commit state to {}. Failed during checking {} times. " + + "Treating commit state as unknown.", + tableName, + maxAttempts); + } + return status.get(); + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 516483e49a0c..f51860c6d32f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -33,7 +34,6 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -46,6 +46,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; @@ -54,13 +55,18 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewOperations; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { +public class HiveCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Configurable { public static final String LIST_ALL_TABLES = "list-all-tables"; public static final String LIST_ALL_TABLES_DEFAULT = "false"; @@ -82,6 +88,11 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa public HiveCatalog() {} + @Override + protected ViewOperations newViewOps(TableIdentifier tableIdentifier) { + return new HiveViewOperations(conf, clients, fileIO, name, tableIdentifier); + } + @Override public void initialize(String inputName, Map properties) { this.catalogProperties = ImmutableMap.copyOf(properties); @@ -115,40 +126,22 @@ public void initialize(String inputName, Map properties) { @Override public List listTables(Namespace namespace) { - Preconditions.checkArgument( - isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); - String database = namespace.level(0); try { - List tableNames = clients.run(client -> client.getAllTables(database)); List tableIdentifiers; if (listAllTables) { - tableIdentifiers = - tableNames.stream() - .map(t -> TableIdentifier.of(namespace, t)) - .collect(Collectors.toList()); + tableIdentifiers = listAllTables(namespace); } else { - List tableObjects = - clients.run(client -> client.getTableObjectsByName(database, tableNames)); tableIdentifiers = - tableObjects.stream() - .filter( - table -> - table.getParameters() != null - && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE - .equalsIgnoreCase( - table - .getParameters() - .get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) - .map(table -> TableIdentifier.of(namespace, table.getTableName())) - .collect(Collectors.toList()); + listTablesByType( + namespace, + TableType.EXTERNAL_TABLE, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); } LOG.debug( - "Listing of namespace: {} resulted in the following tables: {}", - namespace, - tableIdentifiers); + "Listing of namespace: {} resulted in the following: {}", namespace, tableIdentifiers); return tableIdentifiers; } catch (UnknownDBException e) { @@ -235,7 +228,9 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { try { Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); - HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, from)); + HiveOperationsBase.validateTableOrViewIsIceberg(table, fullTableName(name, from)); + + validateToTableForRename(from, to); table.setDbName(toDatabase); table.setTableName(to.name()); @@ -251,9 +246,146 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { } catch (NoSuchObjectException e) { throw new NoSuchTableException("Table does not exist: %s", from); - } catch (AlreadyExistsException e) { - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Table already exists: %s", to); + } catch (TException e) { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to rename", e); + } + } + + @Override + public boolean dropView(TableIdentifier identifier) { + if (!isValidIdentifier(identifier)) { + return false; + } + + try { + String database = identifier.namespace().level(0); + String viewName = identifier.name(); + Table table = clients.run(client -> client.getTable(database, viewName)); + HiveViewOperations.validateTableIsIcebergView(table, fullTableName(name, identifier)); + clients.run( + client -> { + client.dropTable(database, viewName); + return null; + }); + LOG.info("Dropped View: {}", identifier); + return true; + + } catch (NoSuchViewException | NoSuchObjectException e) { + LOG.info("Skipping drop, View does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropView", e); + } + } + + @Override + public List listViews(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + + try { + return listTablesByType( + namespace, TableType.VIRTUAL_VIEW, BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE); + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all views under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listViews", e); + } + } + + private List listAllTables(Namespace namespace) + throws TException, InterruptedException { + String database = namespace.level(0); + List tableNames = clients.run(client -> client.getAllTables(database)); + return tableNames.stream() + .map(t -> TableIdentifier.of(namespace, t)) + .collect(Collectors.toList()); + } + + private List listTablesByType( + Namespace namespace, TableType tableType, String tableTypeProp) + throws TException, InterruptedException { + String database = namespace.level(0); + List tableNames = clients.run(client -> client.getTables(database, "*", tableType)); + + // Retrieving the Table objects from HMS in batches to avoid OOM + List filteredTableIdentifiers = Lists.newArrayList(); + Iterable> tableNameSets = Iterables.partition(tableNames, 100); + + for (List tableNameSet : tableNameSets) { + filteredTableIdentifiers.addAll(filterIcebergTables(tableNameSet, namespace, tableTypeProp)); + } + + return filteredTableIdentifiers; + } + + private List filterIcebergTables( + List tableNames, Namespace namespace, String tableTypeProp) + throws TException, InterruptedException { + List
tableObjects = + clients.run(client -> client.getTableObjectsByName(namespace.level(0), tableNames)); + return tableObjects.stream() + .filter( + table -> + table.getParameters() != null + && tableTypeProp.equalsIgnoreCase( + table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) + .map(table -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public void renameView(TableIdentifier from, TableIdentifier originalTo) { + + if (!isValidIdentifier(from)) { + throw new NoSuchViewException("Invalid identifier: %s", from); + } + + if (!namespaceExists(originalTo.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", + from, originalTo, originalTo.namespace()); + } + + TableIdentifier to = removeCatalogName(originalTo); + Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); + + String toDatabase = to.namespace().level(0); + String fromDatabase = from.namespace().level(0); + String fromName = from.name(); + + try { + Table fromView = clients.run(client -> client.getTable(fromDatabase, fromName)); + HiveViewOperations.validateTableIsIcebergView(fromView, fullTableName(name, from)); + + validateToTableForRename(from, to); + + fromView.setDbName(toDatabase); + fromView.setTableName(to.name()); + + clients.run( + client -> { + MetastoreUtil.alterTable(client, fromDatabase, fromName, fromView); + return null; + }); + + LOG.info("Renamed view from {}, to {}", from, to); + + } catch (NoSuchObjectException | NoSuchViewException e) { + throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to); } catch (TException e) { throw new RuntimeException("Failed to rename " + from + " to " + to, e); @@ -264,6 +396,26 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { } } + private void validateToTableForRename(TableIdentifier from, TableIdentifier to) + throws TException, InterruptedException { + Table table = null; + + String toDatabase = to.namespace().level(0); + try { + table = clients.run(client -> client.getTable(toDatabase, to.name())); + } catch (NoSuchObjectException nte) { + LOG.trace("Table not found {}.{}", toDatabase, to.name(), nte); + } + + if (table != null) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. %s already exists", + from, + to, + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table"); + } + } + @Override public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java index ea24fe4e1133..b39b129fca71 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java @@ -20,6 +20,9 @@ import java.util.Collections; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -27,8 +30,8 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.io.FileIO; @@ -76,10 +79,10 @@ default boolean exposeInHmsProperties() { return maxHiveTablePropertySize() > 0; } - default void setSchema(TableMetadata metadata, Map parameters) { + default void setSchema(Schema tableSchema, Map parameters) { parameters.remove(TableProperties.CURRENT_SCHEMA); - if (exposeInHmsProperties() && metadata.schema() != null) { - String schema = SchemaParser.toJson(metadata.schema()); + if (exposeInHmsProperties() && tableSchema != null) { + String schema = SchemaParser.toJson(tableSchema); setField(parameters, TableProperties.CURRENT_SCHEMA, schema); } } @@ -93,12 +96,14 @@ default void setField(Map parameters, String key, String value) } } - static void validateTableIsIceberg(Table table, String fullName) { + static void validateTableOrViewIsIceberg(Table table, String fullName) { String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); NoSuchIcebergTableException.check( tableType != null - && tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE), - "Not an iceberg table: %s (type=%s)", + && (tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE) + || tableType.equalsIgnoreCase( + BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE)), + "Not an iceberg table/view: %s (type=%s)", fullName, tableType); } @@ -123,13 +128,14 @@ default void persistTable(Table hmsTable, boolean updateHiveTable, String metada } } - static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { - + static StorageDescriptor storageDescriptor( + Schema schema, String location, boolean hiveEngineEnabled) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); - storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema())); - storageDescriptor.setLocation(metadata.location()); + storageDescriptor.setCols(HiveSchemaUtil.convert(schema)); + storageDescriptor.setLocation(location); SerDeInfo serDeInfo = new SerDeInfo(); serDeInfo.setParameters(Maps.newHashMap()); + if (hiveEngineEnabled) { storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); @@ -139,6 +145,7 @@ static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveE storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); } + storageDescriptor.setSerdeInfo(serDeInfo); return storageDescriptor; } @@ -181,4 +188,32 @@ default Table newHmsTable(String hmsTableOwner) { return newTable; } + + default void setHmsParameters( + Table tbl, + String tableTypeProp, + String newMetadataLocation, + Schema schema, + String uuid, + Set obsoleteProps, + Supplier previousLocationSupplier) { + Map parameters = + Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); + + if (!obsoleteProps.contains(TableProperties.UUID) && uuid != null) { + parameters.put(TableProperties.UUID, uuid); + } + + parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation); + parameters.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, tableTypeProp); + + if (previousLocationSupplier.get() != null && !previousLocationSupplier.get().isEmpty()) { + parameters.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + previousLocationSupplier.get()); + } + + setSchema(schema, parameters); + tbl.setParameters(parameters); + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index a3750b9f3101..67b8157d1057 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.MetastoreOperationsUtil; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,10 +143,11 @@ public FileIO io() { @Override protected void doRefresh() { String metadataLocation = null; - try { - Table table = metaClients.run(client -> client.getTable(database, tableName)); - HiveOperationsBase.validateTableIsIceberg(table, fullName); + Table table = null; + try { + table = metaClients.run(client -> client.getTable(database, tableName)); + HiveOperationsBase.validateTableOrViewIsIceberg(table, fullName); metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); } catch (NoSuchObjectException e) { @@ -163,10 +165,14 @@ protected void doRefresh() { throw new RuntimeException("Interrupted during refresh", e); } - refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); + if (table != null && table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + disableRefresh(); + } else { + refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); + } } - @SuppressWarnings("checkstyle:CyclomaticComplexity") + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:methodlength"}) @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; @@ -184,6 +190,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { Table tbl = loadHmsTable(); if (tbl != null) { + if (tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + throw new AlreadyExistsException( + "View with same name already exists: %s.%s", tbl.getDbName(), tbl.getTableName()); + } + // If we try to create the table but the metadata location is already set, then we had a // concurrent commit if (newTable @@ -203,7 +214,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { tbl.setSd( HiveOperationsBase.storageDescriptor( - metadata, hiveEngineEnabled)); // set to pickup any schema changes + metadata.schema(), + metadata.location(), + hiveEngineEnabled)); // set to pickup any schema changes String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; @@ -250,14 +263,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { + "iceberg.hive.lock-heartbeat-interval-ms.", le); } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { - throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); - + throw new AlreadyExistsException( + "%s already exists: %s.%s", + tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table", + tbl.getDbName(), + tbl.getTableName()); } catch (InvalidObjectException e) { throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); - } catch (CommitFailedException | CommitStateUnknownException e) { throw e; - } catch (Throwable e) { if (e.getMessage() .contains( @@ -282,7 +296,12 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { database, tableName, e); - commitStatus = checkCommitStatus(newMetadataLocation, metadata); + commitStatus = + MetastoreOperationsUtil.checkCommitStatus( + tableName(), + newMetadataLocation, + metadata.properties(), + this::calculateCommitStatusWithUpdatedLocation); switch (commitStatus) { case SUCCESS: break; @@ -328,6 +347,7 @@ private void setHmsTableParameters( Set obsoleteProps, boolean hiveEngineEnabled, Map summary) { + Map parameters = Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); @@ -341,19 +361,20 @@ private void setHmsTableParameters( String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key); parameters.put(hmsKey, entry.getValue()); }); - if (metadata.uuid() != null) { - parameters.put(TableProperties.UUID, metadata.uuid()); - } // remove any props from HMS that are no longer present in Iceberg table props obsoleteProps.forEach(parameters::remove); - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); - parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + setHmsParameters( + tbl, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + newMetadataLocation, + metadata.schema(), + metadata.uuid(), + obsoleteProps, + this::currentMetadataLocation); - if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { - parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); - } + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); // If needed set the 'storage_handler' property to enable query from Hive if (hiveEngineEnabled) { @@ -376,7 +397,6 @@ private void setHmsTableParameters( } setSnapshotStats(metadata, parameters); - setSchema(metadata, parameters); setPartitionSpec(metadata, parameters); setSortOrder(metadata, parameters); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java new file mode 100644 index 000000000000..c162fff0d10b --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.MetastoreOperationsUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Hive implementation of Iceberg ViewOperations. */ +final class HiveViewOperations extends BaseViewOperations implements HiveOperationsBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveViewOperations.class); + + private final String fullName; + private final String database; + private final String viewName; + private final FileIO fileIO; + private final ClientPool metaClients; + private final long maxHiveTablePropertySize; + + HiveViewOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier) { + String dbName = viewIdentifier.namespace().level(0); + this.metaClients = metaClients; + this.fileIO = fileIO; + this.fullName = BaseMetastoreCatalog.fullTableName(catalogName, viewIdentifier); + this.database = dbName; + this.viewName = viewIdentifier.name(); + this.maxHiveTablePropertySize = + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + } + + @Override + public void doRefresh() { + String metadataLocation = null; + Table table = null; + + try { + table = metaClients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableOrViewIsIceberg(table, fullName); + metadataLocation = + table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + } catch (NoSuchObjectException e) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s.%s", database, viewName); + } + } catch (TException e) { + String errMsg = + String.format("Failed to get view info from metastore %s.%s", database, viewName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + + if (table != null && !table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + disableRefresh(); + } else { + refreshFromMetadataLocation(metadataLocation); + } + } + + @Override + public void doCommit(ViewMetadata base, ViewMetadata metadata) { + boolean newView = base == null; + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + boolean updateHiveView = false; + BaseMetastoreTableOperations.CommitStatus commitStatus = + BaseMetastoreTableOperations.CommitStatus.FAILURE; + + try { + Table view = null; + + try { + view = metaClients.run(client -> client.getTable(database, viewName)); + } catch (NoSuchObjectException nte) { + LOG.trace("View not found {}.{}", database, viewName, nte); + } + + if (view != null) { + if (!view.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + throw new AlreadyExistsException( + "Table with same name already exists: %s.%s", database, viewName); + } + + // If we try to create the view but the metadata location is already set, then we had a + // concurrent commit + if (newView + && view.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + throw new AlreadyExistsException("View already exists: %s.%s", database, viewName); + } + + updateHiveView = true; + LOG.debug("Committing existing view: {}", fullName); + } else { + view = + newHmsTable( + metadata + .properties() + .getOrDefault(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new view: {}", fullName); + } + + view.setSd( + HiveOperationsBase.storageDescriptor( + metadata.schema(), metadata.location(), false)); // set to pickup any schema changes + + String metadataLocation = + view.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Base metadata location '%s' is not same as the current view metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, viewName); + } + + setHmsParameters( + view, + BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + newMetadataLocation, + metadata.schema(), + metadata.uuid(), + Collections.emptySet(), + this::currentMetadataLocation); + + commitStatus = + persistView( + view, + updateHiveView, + baseMetadataLocation, + newMetadataLocation, + metadata.properties()); + + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database, viewName), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } finally { + HiveOperationsBase.cleanupMetadata(fileIO, commitStatus.name(), newMetadataLocation); + } + + LOG.info( + "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); + } + + private BaseMetastoreTableOperations.CommitStatus persistView( + Table view, + boolean updateHiveView, + String baseMetadataLocation, + String newMetadataLocation, + Map properties) + throws TException, InterruptedException { + BaseMetastoreTableOperations.CommitStatus commitStatus; + + try { + persistTable(view, updateHiveView, baseMetadataLocation); + commitStatus = BaseMetastoreTableOperations.CommitStatus.SUCCESS; + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException( + "%s already exists: %s.%s", + view.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table", + view.getDbName(), + view.getTableName()); + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database, viewName); + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + } catch (Throwable e) { + if (e.getMessage() + .contains( + "The view has been modified. The parameter value for key '" + + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The view %s.%s has been modified concurrently", database, viewName); + } + + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + viewName, + e); + + commitStatus = + MetastoreOperationsUtil.checkCommitStatus( + viewName(), + newMetadataLocation, + properties, + this::calculateCommitStatusWithUpdatedLocation); + + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } + } + return commitStatus; + } + + private boolean calculateCommitStatusWithUpdatedLocation(String newMetadataLocation) { + ViewMetadata metadata = refresh(); + String currentMetadataFileLocation = metadata.metadataFileLocation(); + return newMetadataLocation.equals(currentMetadataFileLocation); + } + + @Override + public TableType tableType() { + return TableType.VIRTUAL_VIEW; + } + + @Override + public ClientPool metaClients() { + return metaClients; + } + + @Override + public long maxHiveTablePropertySize() { + return maxHiveTablePropertySize; + } + + @Override + public String database() { + return database; + } + + @Override + public String table() { + return viewName; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String viewName() { + return fullName; + } + + static void validateTableIsIcebergView(Table table, String fullName) { + String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); + NoSuchIcebergViewException.check( + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + && tableType != null + && tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_VIEW_TYPE_VALUE), + "Not an iceberg view: %s (type=%s) (tableType=%s)", + fullName, + tableType, + table.getTableType()); + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 904e74939eb2..95bf6c697c32 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -52,7 +52,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -82,7 +81,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; import org.apache.thrift.TException; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1043,7 +1041,7 @@ public void testNotExposeTableProperties() { .doesNotContainKey(CURRENT_SNAPSHOT_ID) .doesNotContainKey(CURRENT_SNAPSHOT_TIMESTAMP); - ops.setSchema(metadata, parameters); + ops.setSchema(metadata.schema(), parameters); assertThat(parameters).doesNotContainKey(CURRENT_SCHEMA); ops.setPartitionSpec(metadata, parameters); @@ -1181,55 +1179,4 @@ public void testDatabaseLocationWithSlashInWarehouseDir() { assertThat(database.getLocationUri()).isEqualTo("s3://bucket/database.db"); } - - // TODO: This test should be removed after fix of https://github.com/apache/iceberg/issues/9289. - @Test - @Override - public void testRenameTableDestinationTableAlreadyExists() { - Namespace ns = Namespace.of("newdb"); - TableIdentifier renamedTable = TableIdentifier.of(ns, "table_renamed"); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(ns); - } - - Assertions.assertThat(catalog.tableExists(TABLE)) - .as("Source table should not exist before create") - .isFalse(); - - catalog.buildTable(TABLE, SCHEMA).create(); - Assertions.assertThat(catalog.tableExists(TABLE)) - .as("Source table should exist after create") - .isTrue(); - - Assertions.assertThat(catalog.tableExists(renamedTable)) - .as("Destination table should not exist before create") - .isFalse(); - - catalog.buildTable(renamedTable, SCHEMA).create(); - Assertions.assertThat(catalog.tableExists(renamedTable)) - .as("Destination table should exist after create") - .isTrue(); - - // With fix of issues#9289,it should match with CatalogTests and expect - // AlreadyExistsException.class - // and message should contain as "Table already exists" - Assertions.assertThatThrownBy(() -> catalog.renameTable(TABLE, renamedTable)) - .isInstanceOf(RuntimeException.class) - .hasMessageContaining("new table newdb.table_renamed already exists"); - Assertions.assertThat(catalog.tableExists(TABLE)) - .as("Source table should still exist after failed rename") - .isTrue(); - Assertions.assertThat(catalog.tableExists(renamedTable)) - .as("Destination table should still exist after failed rename") - .isTrue(); - - String sourceTableUUID = - ((HasTableOperations) catalog.loadTable(TABLE)).operations().current().uuid(); - String destinationTableUUID = - ((HasTableOperations) catalog.loadTable(renamedTable)).operations().current().uuid(); - Assertions.assertThat(sourceTableUUID) - .as("Source and destination table should remain distinct after failed rename") - .isNotEqualTo(destinationTableUUID); - } } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java new file mode 100644 index 000000000000..2bed205a58b7 --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.ViewCatalogTests; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestHiveViewCatalog extends ViewCatalogTests { + + private HiveCatalog catalog; + + @RegisterExtension + private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = + HiveMetastoreExtension.builder().build(); + + @BeforeEach + public void before() throws TException { + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + String.valueOf(TimeUnit.SECONDS.toMillis(10))), + HIVE_METASTORE_EXTENSION.hiveConf()); + } + + @AfterEach + public void cleanup() throws Exception { + HIVE_METASTORE_EXTENSION.metastore().reset(); + } + + @Override + protected HiveCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +}