From aaaa9a41ee6321d6825a2c862ce0dcbd8c5a963b Mon Sep 17 00:00:00 2001 From: nk1506 Date: Sat, 28 Oct 2023 22:27:21 +0530 Subject: [PATCH] Addressed review comments. --- .../apache/iceberg/view/ViewCatalogTests.java | 51 ++++- .../org/apache/iceberg/hive/HiveCatalog.java | 96 ++++------ .../apache/iceberg/hive/HiveCatalogUtil.java | 15 ++ .../iceberg/hive/HiveTableOperations.java | 16 +- .../iceberg/hive/HiveViewOperations.java | 175 +++++++++++------- .../iceberg/hive/HiveMetastoreSetup.java | 73 -------- .../iceberg/hive/TestHiveViewCatalog.java | 153 +-------------- 7 files changed, 219 insertions(+), 360 deletions(-) delete mode 100644 hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreSetup.java diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index 0cbedfc62593..0aba414d8d4d 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -52,7 +52,7 @@ public abstract class ViewCatalogTests + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table already exists: ns.view"), + throwable -> + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View already exists: ns.view"), + throwable -> + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View with same name already exists: ns.view")); } @Test @@ -403,8 +414,15 @@ public void replaceTableViaTransactionThatAlreadyExistsAsView() { .buildTable(viewIdentifier, SCHEMA) .replaceTransaction() .commitTransaction()) - .isInstanceOf(NoSuchTableException.class) - .hasMessageStartingWith("Table does not exist: ns.view"); + .satisfiesAnyOf( + throwable -> + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View with same name already exists: ns.view"), + throwable -> + assertThat(throwable) + .isInstanceOf(NoSuchTableException.class) + .hasMessageStartingWith("Table does not exist: ns.view")); } @Test @@ -468,8 +486,15 @@ public void replaceViewThatAlreadyExistsAsTable() { .withDefaultNamespace(tableIdentifier.namespace()) .withQuery("spark", "select * from ns.tbl") .replace()) - .isInstanceOf(NoSuchViewException.class) - .hasMessageStartingWith("View does not exist: ns.table"); + .satisfiesAnyOf( + throwable -> + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table with same name already exists: ns.table"), + throwable -> + assertThat(throwable) + .isInstanceOf(NoSuchViewException.class) + .hasMessageStartingWith("View does not exist: ns.table")); } @Test @@ -718,8 +743,16 @@ public void renameTableTargetAlreadyExistsAsView() { assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue(); assertThatThrownBy(() -> tableCatalog().renameTable(tableIdentifier, viewIdentifier)) - .isInstanceOf(AlreadyExistsException.class) - .hasMessageContaining("Cannot rename ns.table to ns.view. View already exists"); + .satisfiesAnyOf( + throwable -> + assertThat(throwable) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("new table ns.view already exists"), + throwable -> + assertThat(throwable) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining( + "Cannot rename ns.table to ns.view. View already exists")); } @Test diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 3d959845877f..8ed70382f728 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -21,7 +21,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -124,43 +126,13 @@ public void initialize(String inputName, Map properties) { @Override public List listTables(Namespace namespace) { - Preconditions.checkArgument( - isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); - String database = namespace.level(0); try { - List tableNames = clients.run(client -> client.getAllTables(database)); - List tableIdentifiers; - if (listAllTables) { - tableIdentifiers = - tableNames.stream() - .map(t -> TableIdentifier.of(namespace, t)) - .collect(Collectors.toList()); + return listContents(namespace, null, table -> true); } else { - List tableObjects = - clients.run(client -> client.getTableObjectsByName(database, tableNames)); - tableIdentifiers = - tableObjects.stream() - .filter( - table -> - table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.name()) - && table.getParameters() != null - && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE - .equalsIgnoreCase( - table - .getParameters() - .get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) - .map(table -> TableIdentifier.of(namespace, table.getTableName())) - .collect(Collectors.toList()); + return listContents(namespace, TableType.EXTERNAL_TABLE.name(), icebergPredicate()); } - - LOG.debug( - "Listing of namespace: {} resulted in the following tables: {}", - namespace, - tableIdentifiers); - return tableIdentifiers; - } catch (UnknownDBException e) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); @@ -315,33 +287,8 @@ public boolean dropView(TableIdentifier identifier) { @Override public List listViews(Namespace namespace) { - Preconditions.checkArgument( - isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); - String database = namespace.level(0); - try { - List tableNames = - clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW)); - List
tableObjects = - clients.run(client -> client.getTableObjectsByName(database, tableNames)); - List tableIdentifiers = - tableObjects.stream() - .filter( - table -> - table.getParameters() != null - && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase( - table - .getParameters() - .get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) - .map(table -> TableIdentifier.of(namespace, table.getTableName())) - .collect(Collectors.toList()); - - LOG.debug( - "Listing of namespace: {} resulted in the following views: {}", - namespace, - tableIdentifiers); - return tableIdentifiers; - + return listContents(namespace, TableType.VIRTUAL_VIEW.name(), icebergPredicate()); } catch (UnknownDBException e) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); @@ -354,6 +301,39 @@ public List listViews(Namespace namespace) { } } + private List listContents( + Namespace namespace, String tableType, Predicate
tablePredicate) + throws TException, InterruptedException { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + String database = namespace.level(0); + List tableNames = + StringUtils.isNotEmpty(tableType) + ? clients.run(client -> client.getTables(database, "*", TableType.valueOf(tableType))) + : clients.run(client -> client.getAllTables(database)); + List
tableObjects = + clients.run(client -> client.getTableObjectsByName(database, tableNames)); + List tableIdentifiers = + tableObjects.stream() + .filter(tablePredicate) + .map(table -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + + LOG.debug( + "Listing of namespace: {} for table type {} resulted in the following: {}", + namespace, + tableType, + tableIdentifiers); + return tableIdentifiers; + } + + private Predicate
icebergPredicate() { + return table -> + table.getParameters() != null + && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase( + table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + } + @Override @SuppressWarnings("FormatStringAnnotation") public void renameView(TableIdentifier from, TableIdentifier originalTo) { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogUtil.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogUtil.java index fb1f5b0fc7f7..5d5a7c51e784 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogUtil.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogUtil.java @@ -92,4 +92,19 @@ static void validateTableIsIceberg(Table table, String fullName) { tableType, table.getTableType()); } + + static void matchAndThrowExistenceTypeException(Table table) { + if (table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + throw new AlreadyExistsException( + "View already exists: %s.%s", table.getDbName(), table.getTableName()); + } + throw new AlreadyExistsException( + "Table already exists: %s.%s", table.getDbName(), table.getTableName()); + } + + enum CommitStatus { + FAILURE, + SUCCESS, + UNKNOWN + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 2dc5db1081ab..7888c74d53de 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -47,7 +47,6 @@ import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -74,9 +73,14 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries"; - + // the max size is based on HMS backend database. For Hive versions below 2.3, the max table + // parameter size is 4000 + // characters, see https://issues.apache.org/jira/browse/HIVE-12274 + // set to 0 to not expose Iceberg metadata in HMS Table properties. + private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size"; private static final String NO_LOCK_EXPECTED_KEY = "expected_parameter_key"; private static final String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"; + private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2; private static final BiMap ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of( @@ -132,9 +136,7 @@ protected HiveTableOperations( HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT); this.maxHiveTablePropertySize = - conf.getLong( - HiveCatalogUtil.HIVE_TABLE_PROPERTY_MAX_SIZE, - HiveCatalogUtil.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); } @Override @@ -197,7 +199,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (newTable && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { - throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); + HiveCatalogUtil.matchAndThrowExistenceTypeException(tbl); } updateHiveTable = true; @@ -254,7 +256,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { + "iceberg.hive.lock-heartbeat-interval-ms.", le); } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { - throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); + HiveCatalogUtil.matchAndThrowExistenceTypeException(tbl); } catch (InvalidObjectException e) { throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index f085dbe7103e..2e70d96a7fc5 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -23,8 +23,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; @@ -42,11 +42,12 @@ import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.hadoop.ConfigProperties; +import org.apache.iceberg.hive.HiveCatalogUtil.CommitStatus; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseViewOperations; import org.apache.iceberg.view.ViewMetadata; import org.apache.thrift.TException; @@ -59,8 +60,7 @@ final class HiveViewOperations extends BaseViewOperations { private final String fullName; private final String database; - private final String tableName; - private final Configuration conf; + private final String viewName; private final FileIO fileIO; private final ClientPool metaClients; private final long maxHiveTablePropertySize; @@ -71,15 +71,14 @@ final class HiveViewOperations extends BaseViewOperations { ClientPool metaClients, FileIO fileIO, String catalogName, - TableIdentifier tableIdentifier) { - this.identifier = tableIdentifier; - String dbName = tableIdentifier.namespace().level(0); - this.conf = conf; + TableIdentifier viewIdentifier) { + this.identifier = viewIdentifier; + String dbName = viewIdentifier.namespace().level(0); this.metaClients = metaClients; this.fileIO = fileIO; - this.fullName = catalogName + "." + dbName + "." + tableIdentifier.name(); + this.fullName = catalogName + "." + dbName + "." + viewIdentifier.name(); this.database = dbName; - this.tableName = tableIdentifier.name(); + this.viewName = viewIdentifier.name(); this.maxHiveTablePropertySize = conf.getLong( HiveCatalogUtil.HIVE_TABLE_PROPERTY_MAX_SIZE, @@ -90,7 +89,7 @@ final class HiveViewOperations extends BaseViewOperations { public ViewMetadata current() { if (HiveCatalogUtil.isTableWithTypeExists(metaClients, identifier, TableType.EXTERNAL_TABLE)) { throw new AlreadyExistsException( - "Table with same name already exists: %s.%s", database, tableName); + "Table with same name already exists: %s.%s", database, viewName); } return super.current(); } @@ -99,17 +98,17 @@ public ViewMetadata current() { public void doRefresh() { String metadataLocation = null; try { - Table table = metaClients.run(client -> client.getTable(database, tableName)); + Table table = metaClients.run(client -> client.getTable(database, viewName)); HiveCatalogUtil.validateTableIsIcebergView(table, fullName); metadataLocation = table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { - throw new NoSuchViewException("View does not exist: %s.%s", database, tableName); + throw new NoSuchViewException("View does not exist: %s.%s", database, viewName); } } catch (TException e) { String errMsg = - String.format("Failed to get view info from metastore %s.%s", database, tableName); + String.format("Failed to get view info from metastore %s.%s", database, viewName); throw new RuntimeException(errMsg, e); } catch (InterruptedException e) { @@ -122,64 +121,55 @@ public void doRefresh() { @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override public void doCommit(ViewMetadata base, ViewMetadata metadata) { - boolean newTable = base == null; + boolean newView = base == null; String newMetadataLocation = writeNewMetadataIfRequired(metadata); - boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); - - boolean updateHiveTable = false; + boolean updateHiveView = false; + CommitStatus commitStatus = CommitStatus.FAILURE; try { - Table tbl = loadHmsTable(); + Optional
hmsTable = Optional.ofNullable(loadHmsTable()); + Table view; - if (tbl != null) { + if (hmsTable.isPresent()) { + view = hmsTable.get(); // If we try to create the view but the metadata location is already set, then we had a // concurrent commit - if (newTable - && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + if (newView + && view.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { - throw new AlreadyExistsException("View already exists: %s.%s", database, tableName); + HiveCatalogUtil.matchAndThrowExistenceTypeException(view); } - updateHiveTable = true; + updateHiveView = true; LOG.debug("Committing existing view: {}", fullName); } else { - tbl = newHmsTable(metadata); + view = newHmsView(metadata); LOG.debug("Committing new view: {}", fullName); } - tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes + view.setSd(storageDescriptor(metadata)); // set to pickup any schema changes String metadataLocation = - tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + view.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; if (!Objects.equals(baseMetadataLocation, metadataLocation)) { throw new CommitFailedException( - "Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", - baseMetadataLocation, metadataLocation, database, tableName); + "Base metadata location '%s' is not same as the current view metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, viewName); } - setHmsTableParameters(newMetadataLocation, tbl, metadata); - - if (!keepHiveStats) { - tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); - } + setHmsTableParameters(newMetadataLocation, view, metadata); try { - persistTable(tbl, updateHiveTable, baseMetadataLocation); - - } catch (LockException le) { - throw new CommitStateUnknownException( - "Failed to heartbeat for hive lock while " - + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " - + "Please check the commit history. If you are running into this issue, try reducing " - + "iceberg.hive.lock-heartbeat-interval-ms.", - le); + persistView(view, updateHiveView, baseMetadataLocation); + commitStatus = CommitStatus.SUCCESS; + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { - throw new AlreadyExistsException(e, "View already exists: %s.%s", database, tableName); + HiveCatalogUtil.matchAndThrowExistenceTypeException(view); } catch (InvalidObjectException e) { - throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); + throw new ValidationException(e, "Invalid Hive object for %s.%s", database, viewName); } catch (CommitFailedException | CommitStateUnknownException e) { throw e; @@ -187,31 +177,31 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { } catch (Throwable e) { if (e.getMessage() .contains( - "The table has been modified. The parameter value for key '" + "The view has been modified. The parameter value for key '" + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + "' is")) { throw new CommitFailedException( - e, "The table %s.%s has been modified concurrently", database, tableName); - } - - if (e.getMessage() != null - && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { - throw new RuntimeException( - "Failed to acquire locks from metastore because the underlying metastore " - + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " - + "support transactions. To fix this use an alternative metastore.", - e); + e, "The view %s.%s has been modified concurrently", database, viewName); } LOG.error( "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", database, - tableName, + viewName, e); + commitStatus = checkCommitStatus(newMetadataLocation); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } } } catch (TException e) { throw new RuntimeException( - String.format("Metastore operation failed for %s.%s", database, tableName), e); + String.format("Metastore operation failed for %s.%s", database, viewName), e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -219,13 +209,66 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { } catch (LockException e) { throw new CommitFailedException(e); + } finally { + cleanupMetadata(commitStatus, newMetadataLocation); } LOG.info( "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); } - void persistTable(Table hmsTable, boolean updateHiveTable, String expectedMetadataLocation) + private void cleanupMetadata(CommitStatus commitStatus, String metadataLocation) { + try { + if (commitStatus == CommitStatus.FAILURE) { + // If we are sure the commit failed, clean up the uncommitted metadata file + io().deleteFile(metadataLocation); + } + } catch (RuntimeException e) { + LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e); + } + } + + private CommitStatus checkCommitStatus(String newMetadataLocation) { + AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(5) + .suppressFailureWhenFinished() + .exponentialBackoff(100, 1000, 10000, 2.0) + .onFailure( + (location, checkException) -> + LOG.error("Cannot check if commit to {} exists.", viewName(), checkException)) + .run( + location -> { + ViewMetadata metadata = refresh(); + String currentMetadataFileLocation = metadata.metadataFileLocation(); + boolean commitSuccess = newMetadataLocation.equals(currentMetadataFileLocation); + if (commitSuccess) { + LOG.info( + "Commit status check: Commit to {} of {} succeeded", + viewName(), + newMetadataLocation); + status.set(CommitStatus.SUCCESS); + } else { + LOG.warn( + "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + + "or in history", + viewName(), + newMetadataLocation); + } + }); + + if (status.get() == CommitStatus.UNKNOWN) { + LOG.error( + "Cannot determine commit state to {}. Failed during checking {} times. " + + "Treating commit state as unknown.", + viewName(), + 5); + } + return status.get(); + } + + void persistView(Table hmsTable, boolean updateHiveTable, String expectedMetadataLocation) throws TException, InterruptedException { if (updateHiveTable) { metaClients.run( @@ -233,7 +276,7 @@ void persistTable(Table hmsTable, boolean updateHiveTable, String expectedMetada MetastoreUtil.alterTable( client, database, - tableName, + viewName, hmsTable, expectedMetadataLocation != null ? ImmutableMap.of( @@ -253,9 +296,9 @@ void persistTable(Table hmsTable, boolean updateHiveTable, String expectedMetada Table loadHmsTable() throws TException, InterruptedException { try { - return metaClients.run(client -> client.getTable(database, tableName)); + return metaClients.run(client -> client.getTable(database, viewName)); } catch (NoSuchObjectException nte) { - LOG.trace("Table not found {}", fullName, nte); + LOG.trace("View not found {}", fullName, nte); return null; } } @@ -273,13 +316,13 @@ private StorageDescriptor storageDescriptor(ViewMetadata metadata) { return storageDescriptor; } - private Table newHmsTable(ViewMetadata metadata) { + private Table newHmsView(ViewMetadata metadata) { Preconditions.checkNotNull(metadata, "'metadata' parameter can't be null"); final long currentTimeMillis = System.currentTimeMillis(); - Table newTable = + Table newView = new Table( - tableName, + viewName, database, HiveHadoopUtil.currentUser(), (int) currentTimeMillis / 1000, @@ -292,7 +335,7 @@ private Table newHmsTable(ViewMetadata metadata) { null, TableType.VIRTUAL_VIEW.toString()); - return newTable; + return newView; } private void setHmsTableParameters(String newMetadataLocation, Table tbl, ViewMetadata metadata) { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreSetup.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreSetup.java deleted file mode 100644 index 3d806a0a5925..000000000000 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreSetup.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.hive; - -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; - -/** - * Setup HiveMetastore. It does not create any database. All the tests should create a database - * accordingly. It should replace the existing setUp class {@link HiveMetastoreTest} - */ -class HiveMetastoreSetup { - - protected HiveMetaStoreClient metastoreClient; - protected TestHiveMetastore metastore; - protected HiveConf hiveConf; - HiveCatalog catalog; - - HiveMetastoreSetup(Map hiveConfOverride) throws Exception { - metastore = new TestHiveMetastore(); - HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class); - if (hiveConfOverride != null) { - for (Map.Entry kv : hiveConfOverride.entrySet()) { - hiveConfWithOverrides.set(kv.getKey(), kv.getValue()); - } - } - - metastore.start(hiveConfWithOverrides); - hiveConf = metastore.hiveConf(); - metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides); - catalog = - (HiveCatalog) - CatalogUtil.loadCatalog( - HiveCatalog.class.getName(), - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - ImmutableMap.of( - CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, - String.valueOf(TimeUnit.SECONDS.toMillis(10))), - hiveConfWithOverrides); - } - - void stopMetastore() throws Exception { - try { - metastoreClient.close(); - metastore.stop(); - } finally { - catalog = null; - metastoreClient = null; - metastore = null; - } - } -} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index 82af3d01019b..2e80fa817782 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -18,180 +18,39 @@ */ package org.apache.iceberg.hive; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - import java.util.Collections; -import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.view.ViewCatalogTests; -import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; public class TestHiveViewCatalog extends ViewCatalogTests { - private HiveMetastoreSetup hiveMetastoreSetup; + private HiveCatalog catalog; @BeforeEach public void before() throws Exception { - hiveMetastoreSetup = new HiveMetastoreSetup(Collections.emptyMap()); + HiveMetastoreTest.startMetastore(Collections.emptyMap()); + this.catalog = HiveMetastoreTest.catalog; } @AfterEach public void after() throws Exception { - hiveMetastoreSetup.stopMetastore(); + HiveMetastoreTest.stopMetastore(); } @Override protected HiveCatalog catalog() { - return hiveMetastoreSetup.catalog; + return catalog; } @Override protected Catalog tableCatalog() { - return hiveMetastoreSetup.catalog; + return catalog; } @Override protected boolean requiresNamespaceCreate() { return true; } - - // Override few tests which are using AlreadyExistsException instead of NoSuchViewException - - @Override - @Test - public void replaceTableViaTransactionThatAlreadyExistsAsView() { - Assumptions.assumeThat(catalog()).as("Only valid for catalogs that support tables").isNotNull(); - - TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view"); - - if (requiresNamespaceCreate()) { - catalog().createNamespace(viewIdentifier.namespace()); - } - - assertThat(catalog().viewExists(viewIdentifier)).as("View should not exist").isFalse(); - - catalog() - .buildView(viewIdentifier) - .withSchema(SCHEMA) - .withDefaultNamespace(viewIdentifier.namespace()) - .withQuery("spark", "select * from ns.tbl") - .create(); - - assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue(); - - assertThatThrownBy( - () -> - catalog() - .buildTable(viewIdentifier, SCHEMA) - .replaceTransaction() - .commitTransaction()) - .isInstanceOf(AlreadyExistsException.class) - .hasMessageStartingWith("View with same name already exists: ns.view"); - - assertThat(catalog().dropView(viewIdentifier)).isTrue(); - assertThat(catalog().viewExists(viewIdentifier)).as("View should not exist").isFalse(); - } - - @Override - @Test - public void replaceViewThatAlreadyExistsAsTable() { - Assumptions.assumeThat(tableCatalog()) - .as("Only valid for catalogs that support tables") - .isNotNull(); - - TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table"); - - if (requiresNamespaceCreate()) { - catalog().createNamespace(tableIdentifier.namespace()); - } - - assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should not exist").isFalse(); - - tableCatalog().buildTable(tableIdentifier, SCHEMA).create(); - - assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should exist").isTrue(); - - assertThatThrownBy( - () -> - catalog() - .buildView(tableIdentifier) - .withSchema(OTHER_SCHEMA) - .withDefaultNamespace(tableIdentifier.namespace()) - .withQuery("spark", "select * from ns.tbl") - .replace()) - .isInstanceOf(AlreadyExistsException.class) - .hasMessageStartingWith("Table with same name already exists: ns.table"); - } - - @Override - @Test - public void renameTableTargetAlreadyExistsAsView() { - Assumptions.assumeThat(tableCatalog()) - .as("Only valid for catalogs that support tables") - .isNotNull(); - - TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view"); - TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table"); - - if (requiresNamespaceCreate()) { - catalog().createNamespace(tableIdentifier.namespace()); - } - - assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should not exist").isFalse(); - - tableCatalog().buildTable(tableIdentifier, SCHEMA).create(); - - assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should exist").isTrue(); - - assertThat(catalog().viewExists(viewIdentifier)).as("View should not exist").isFalse(); - - catalog() - .buildView(viewIdentifier) - .withSchema(SCHEMA) - .withDefaultNamespace(viewIdentifier.namespace()) - .withQuery("spark", "select * from ns.tbl") - .create(); - - assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue(); - - assertThatThrownBy(() -> tableCatalog().renameTable(tableIdentifier, viewIdentifier)) - .hasMessageContaining("new table ns.view already exists"); - } - - @Override - @Test - public void createTableViaTransactionThatAlreadyExistsAsView() { - Assumptions.assumeThat(tableCatalog()) - .as("Only valid for catalogs that support tables") - .isNotNull(); - - TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view"); - - if (requiresNamespaceCreate()) { - catalog().createNamespace(viewIdentifier.namespace()); - } - - assertThat(catalog().viewExists(viewIdentifier)).as("View should not exist").isFalse(); - - Transaction transaction = tableCatalog().buildTable(viewIdentifier, SCHEMA).createTransaction(); - - catalog() - .buildView(viewIdentifier) - .withSchema(SCHEMA) - .withDefaultNamespace(viewIdentifier.namespace()) - .withQuery("spark", "select * from ns.tbl") - .create(); - - assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue(); - - assertThatThrownBy(transaction::commitTransaction) - .isInstanceOf(AlreadyExistsException.class) - .hasMessageStartingWith("Table already exists: ns.view"); - } }