Skip to content

Commit

Permalink
Addressed review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Nov 2, 2023
1 parent 3508dcb commit aaaa9a4
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 360 deletions.
51 changes: 42 additions & 9 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class ViewCatalogTests<C extends ViewCatalog & SupportsNamespace
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "data", Types.StringType.get()));

protected static final Schema OTHER_SCHEMA =
private static final Schema OTHER_SCHEMA =
new Schema(7, required(1, "some_id", Types.IntegerType.get()));

protected abstract C catalog();
Expand Down Expand Up @@ -368,8 +368,19 @@ public void createTableViaTransactionThatAlreadyExistsAsView() {
assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue();

assertThatThrownBy(transaction::commitTransaction)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("View with same name already exists: ns.view");
.satisfiesAnyOf(
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table already exists: ns.view"),
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("View already exists: ns.view"),
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("View with same name already exists: ns.view"));
}

@Test
Expand Down Expand Up @@ -403,8 +414,15 @@ public void replaceTableViaTransactionThatAlreadyExistsAsView() {
.buildTable(viewIdentifier, SCHEMA)
.replaceTransaction()
.commitTransaction())
.isInstanceOf(NoSuchTableException.class)
.hasMessageStartingWith("Table does not exist: ns.view");
.satisfiesAnyOf(
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("View with same name already exists: ns.view"),
throwable ->
assertThat(throwable)
.isInstanceOf(NoSuchTableException.class)
.hasMessageStartingWith("Table does not exist: ns.view"));
}

@Test
Expand Down Expand Up @@ -468,8 +486,15 @@ public void replaceViewThatAlreadyExistsAsTable() {
.withDefaultNamespace(tableIdentifier.namespace())
.withQuery("spark", "select * from ns.tbl")
.replace())
.isInstanceOf(NoSuchViewException.class)
.hasMessageStartingWith("View does not exist: ns.table");
.satisfiesAnyOf(
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table with same name already exists: ns.table"),
throwable ->
assertThat(throwable)
.isInstanceOf(NoSuchViewException.class)
.hasMessageStartingWith("View does not exist: ns.table"));
}

@Test
Expand Down Expand Up @@ -718,8 +743,16 @@ public void renameTableTargetAlreadyExistsAsView() {
assertThat(catalog().viewExists(viewIdentifier)).as("View should exist").isTrue();

assertThatThrownBy(() -> tableCatalog().renameTable(tableIdentifier, viewIdentifier))
.isInstanceOf(AlreadyExistsException.class)
.hasMessageContaining("Cannot rename ns.table to ns.view. View already exists");
.satisfiesAnyOf(
throwable ->
assertThat(throwable)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("new table ns.view already exists"),
throwable ->
assertThat(throwable)
.isInstanceOf(AlreadyExistsException.class)
.hasMessageContaining(
"Cannot rename ns.table to ns.view. View already exists"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -124,43 +126,13 @@ public void initialize(String inputName, Map<String, String> properties) {

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(
isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);
String database = namespace.level(0);

try {
List<String> tableNames = clients.run(client -> client.getAllTables(database));
List<TableIdentifier> tableIdentifiers;

if (listAllTables) {
tableIdentifiers =
tableNames.stream()
.map(t -> TableIdentifier.of(namespace, t))
.collect(Collectors.toList());
return listContents(namespace, null, table -> true);
} else {
List<Table> tableObjects =
clients.run(client -> client.getTableObjectsByName(database, tableNames));
tableIdentifiers =
tableObjects.stream()
.filter(
table ->
table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.name())
&& table.getParameters() != null
&& BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(
table
.getParameters()
.get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
return listContents(namespace, TableType.EXTERNAL_TABLE.name(), icebergPredicate());
}

LOG.debug(
"Listing of namespace: {} resulted in the following tables: {}",
namespace,
tableIdentifiers);
return tableIdentifiers;

} catch (UnknownDBException e) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);

Expand Down Expand Up @@ -315,33 +287,8 @@ public boolean dropView(TableIdentifier identifier) {

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
Preconditions.checkArgument(
isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);
String database = namespace.level(0);

try {
List<String> tableNames =
clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW));
List<Table> tableObjects =
clients.run(client -> client.getTableObjectsByName(database, tableNames));
List<TableIdentifier> tableIdentifiers =
tableObjects.stream()
.filter(
table ->
table.getParameters() != null
&& BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(
table
.getParameters()
.get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());

LOG.debug(
"Listing of namespace: {} resulted in the following views: {}",
namespace,
tableIdentifiers);
return tableIdentifiers;

return listContents(namespace, TableType.VIRTUAL_VIEW.name(), icebergPredicate());
} catch (UnknownDBException e) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);

Expand All @@ -354,6 +301,39 @@ public List<TableIdentifier> listViews(Namespace namespace) {
}
}

private List<TableIdentifier> listContents(
Namespace namespace, String tableType, Predicate<Table> tablePredicate)
throws TException, InterruptedException {
Preconditions.checkArgument(
isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);
String database = namespace.level(0);
List<String> tableNames =
StringUtils.isNotEmpty(tableType)
? clients.run(client -> client.getTables(database, "*", TableType.valueOf(tableType)))
: clients.run(client -> client.getAllTables(database));
List<Table> tableObjects =
clients.run(client -> client.getTableObjectsByName(database, tableNames));
List<TableIdentifier> tableIdentifiers =
tableObjects.stream()
.filter(tablePredicate)
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());

LOG.debug(
"Listing of namespace: {} for table type {} resulted in the following: {}",
namespace,
tableType,
tableIdentifiers);
return tableIdentifiers;
}

private Predicate<Table> icebergPredicate() {
return table ->
table.getParameters() != null
&& BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP));
}

@Override
@SuppressWarnings("FormatStringAnnotation")
public void renameView(TableIdentifier from, TableIdentifier originalTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,19 @@ static void validateTableIsIceberg(Table table, String fullName) {
tableType,
table.getTableType());
}

static void matchAndThrowExistenceTypeException(Table table) {
if (table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) {
throw new AlreadyExistsException(
"View already exists: %s.%s", table.getDbName(), table.getTableName());
}
throw new AlreadyExistsException(
"Table already exists: %s.%s", table.getDbName(), table.getTableName());
}

enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -74,9 +73,14 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {

private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
"iceberg.hive.metadata-refresh-max-retries";

// the max size is based on HMS backend database. For Hive versions below 2.3, the max table
// parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
private static final String NO_LOCK_EXPECTED_KEY = "expected_parameter_key";
private static final String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
ImmutableBiMap.of(
Expand Down Expand Up @@ -132,9 +136,7 @@ protected HiveTableOperations(
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize =
conf.getLong(
HiveCatalogUtil.HIVE_TABLE_PROPERTY_MAX_SIZE,
HiveCatalogUtil.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -197,7 +199,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (newTable
&& tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
!= null) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
HiveCatalogUtil.matchAndThrowExistenceTypeException(tbl);
}

updateHiveTable = true;
Expand Down Expand Up @@ -254,7 +256,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
+ "iceberg.hive.lock-heartbeat-interval-ms.",
le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
HiveCatalogUtil.matchAndThrowExistenceTypeException(tbl);

} catch (InvalidObjectException e) {
throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName);
Expand Down
Loading

0 comments on commit aaaa9a4

Please sign in to comment.