Skip to content

Commit

Permalink
Hive: Add View support for HIVE catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Jan 10, 2024
1 parent b7e3e21 commit 586d78b
Show file tree
Hide file tree
Showing 10 changed files with 813 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
public class NoSuchIcebergViewException extends NoSuchViewException {
@FormatMethod
public NoSuchIcebergViewException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new NoSuchIcebergViewException(message, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private Map<String, String> tableOverrideProperties() {
}
}

protected static String fullTableName(String catalogName, TableIdentifier identifier) {
public static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -44,7 +35,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.MetastoreOperationsUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +45,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {

public static final String TABLE_TYPE_PROP = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
public static final String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view";
public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";

Expand Down Expand Up @@ -291,7 +283,7 @@ public long newSnapshotId() {
};
}

protected enum CommitStatus {
public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
Expand All @@ -309,65 +301,19 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
}
return status.get();
return MetastoreOperationsUtil.checkCommitStatus(
tableName(),
newMetadataLocation,
config.properties(),
this::calculateCommitStatusWithUpdatedLocation);
}

protected boolean calculateCommitStatusWithUpdatedLocation(String newMetadataLocation) {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
return currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
}

private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetastoreOperationsUtil {
private static final Logger LOG = LoggerFactory.getLogger(MetastoreOperationsUtil.class);

private MetastoreOperationsUtil() {}

/**
* Attempt to load the table and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
* of our commit.
*
* @param tableName full name of the table
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param commitStatusSupplier calculate commit status with updated location
* @return Commit Status of Success, Failure or Unknown
*/
public static BaseMetastoreTableOperations.CommitStatus checkCommitStatus(
String tableName,
String newMetadataLocation,
Map<String, String> properties,
Function<String, Boolean> commitStatusSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<BaseMetastoreTableOperations.CommitStatus> status =
new AtomicReference<>(BaseMetastoreTableOperations.CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName, checkException))
.run(
location -> {
boolean commitSuccess = commitStatusSupplier.apply(newMetadataLocation);

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName,
newMetadataLocation);
status.set(BaseMetastoreTableOperations.CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName,
newMetadataLocation);
}
});

if (status.get() == BaseMetastoreTableOperations.CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName,
maxAttempts);
}
return status.get();
}
}
Loading

0 comments on commit 586d78b

Please sign in to comment.