Skip to content

Commit

Permalink
Hive: Refactor hive-table commit operation to be used for other opera…
Browse files Browse the repository at this point in the history
…tions like view.
  • Loading branch information
nk1506 committed Jan 27, 2024
1 parent 211f5d5 commit 67077a8
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -43,13 +34,14 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.BaseMetastoreOperations;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMetastoreTableOperations implements TableOperations {
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);

public static final String TABLE_TYPE_PROP = "table_type";
Expand Down Expand Up @@ -291,7 +283,7 @@ public long newSnapshotId() {
};
}

protected enum CommitStatus {
public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
Expand All @@ -309,65 +301,19 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
}
return status.get();
return checkCommitStatus(
tableName(),
newMetadataLocation,
config.properties(),
this::calculateCommitStatusWithUpdatedLocation);
}

protected boolean calculateCommitStatusWithUpdatedLocation(String newMetadataLocation) {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
return currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
}

private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMetastoreOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class);

/**
* Attempt to load the table and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
* of our commit.
*
* @param tableName full name of the table
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param commitStatusSupplier calculate commit status with updated location
* @return Commit Status of Success, Failure or Unknown
*/
public BaseMetastoreTableOperations.CommitStatus checkCommitStatus(
String tableName,
String newMetadataLocation,
Map<String, String> properties,
Function<String, Boolean> commitStatusSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<BaseMetastoreTableOperations.CommitStatus> status =
new AtomicReference<>(BaseMetastoreTableOperations.CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName, checkException))
.run(
location -> {
boolean commitSuccess = commitStatusSupplier.apply(newMetadataLocation);

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName,
newMetadataLocation);
status.set(BaseMetastoreTableOperations.CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName,
newMetadataLocation);
}
});

if (status.get() == BaseMetastoreTableOperations.CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName,
maxAttempts);
}
return status.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.util.BaseMetastoreOperations;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseViewOperations implements ViewOperations {
public abstract class BaseViewOperations extends BaseMetastoreOperations implements ViewOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseViewOperations.class);

private static final String METADATA_FOLDER_NAME = "metadata";
Expand Down
Loading

0 comments on commit 67077a8

Please sign in to comment.