Skip to content

Commit

Permalink
Add executeWithRetries methods to TransactionExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Dec 2, 2024
1 parent 5fed6ee commit 79d6efa
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 63 deletions.
147 changes: 128 additions & 19 deletions core/src/main/java/com/scalar/db/common/TransactionExecutor.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
package com.scalar.db.common;

import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.db.api.CrudOperable;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.RollbackException;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionUtils;
import com.scalar.db.util.ThrowableConsumer;
import com.scalar.db.util.ThrowableFunction;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TransactionExecutor {

private static final Logger logger = LoggerFactory.getLogger(TransactionExecutor.class);

private static final int DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS = 100;
private static final int DEFAULT_RETRY_MAX_INTERVAL_MILLIS = 1000;
private static final int DEFAULT_RETRY_MULTIPLIER = 2;
private static final int DEFAULT_RETRY_MAX_RETRIES = 5;

private static int retryInitialIntervalMillis = DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS;
private static int retryMaxIntervalMillis = DEFAULT_RETRY_MAX_INTERVAL_MILLIS;
private static int retryMultiplier = DEFAULT_RETRY_MULTIPLIER;
private static int retryMaxRetries = DEFAULT_RETRY_MAX_RETRIES;

private TransactionExecutor() {}

public static <T> T execute(
Expand All @@ -32,6 +47,9 @@ public static <T> T execute(
T result = throwableFunction.apply(transaction);
transaction.commit();
return result;
} catch (UnknownTransactionStatusException e) {
// We don't need to rollback the transaction for UnknownTransactionStatusException
throw e;
} catch (Exception e) {
if (transaction != null) {
rollback(transaction);
Expand All @@ -43,26 +61,14 @@ public static <T> T execute(

public static void execute(
DistributedTransactionManager transactionManager,
ThrowableConsumer<CrudOperable<?>, TransactionException> throwableFunction)
ThrowableConsumer<CrudOperable<?>, TransactionException> throwableConsumer)
throws TransactionException {
if (SingleCrudOperationTransactionUtils.isSingleCrudOperationTransactionManager(
transactionManager)) {
throwableFunction.accept(transactionManager);
return;
}

DistributedTransaction transaction = null;
try {
transaction = transactionManager.begin();
throwableFunction.accept(transaction);
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
rollback(transaction);
}

throw e;
}
execute(
transactionManager,
t -> {
throwableConsumer.accept(t);
return null;
});
}

private static void rollback(DistributedTransaction transaction) {
Expand All @@ -72,4 +78,107 @@ private static void rollback(DistributedTransaction transaction) {
logger.warn("Failed to rollback a transaction", e);
}
}

public static <T> T executeWithRetries(
DistributedTransactionManager transactionManager,
ThrowableFunction<CrudOperable<?>, T, TransactionException> throwableFunction)
throws TransactionException {
return executeWithRetries(
transactionManager,
throwableFunction,
retryInitialIntervalMillis,
retryMaxIntervalMillis,
retryMultiplier,
retryMaxRetries);
}

public static <T> T executeWithRetries(
DistributedTransactionManager transactionManager,
ThrowableFunction<CrudOperable<?>, T, TransactionException> throwableFunction,
int retryInitialIntervalMillis,
int retryMaxIntervalMillis,
int retryMultiplier,
int retryMaxRetries)
throws TransactionException {
TransactionException lastException;
int interval = retryInitialIntervalMillis;
int attempt = 0;
while (true) {
try {
return execute(transactionManager, throwableFunction);
} catch (CrudConflictException | CommitConflictException e) {
// Retry the transaction for the conflict exceptions
lastException = e;
}

if (attempt++ >= retryMaxRetries) {
break;
}

logger.warn(
"The transaction failed. Retrying after {} milliseconds... The current attempt count: {}.",
interval,
attempt,
lastException);

Uninterruptibles.sleepUninterruptibly(interval, TimeUnit.MILLISECONDS);

interval *= retryMultiplier;
if (interval > retryMaxIntervalMillis) {
interval = retryMaxIntervalMillis;
}
}

logger.error("The transaction failed after {} retries.", retryMaxRetries, lastException);
throw lastException;
}

public static void executeWithRetries(
DistributedTransactionManager transactionManager,
ThrowableConsumer<CrudOperable<?>, TransactionException> throwableConsumer)
throws TransactionException {
executeWithRetries(
transactionManager,
throwableConsumer,
retryInitialIntervalMillis,
retryMaxIntervalMillis,
retryMultiplier,
retryMaxRetries);
}

public static void executeWithRetries(
DistributedTransactionManager transactionManager,
ThrowableConsumer<CrudOperable<?>, TransactionException> throwableConsumer,
int retryInitialIntervalMillis,
int retryMaxIntervalMillis,
int retryMultiplier,
int retryMaxRetries)
throws TransactionException {
executeWithRetries(
transactionManager,
t -> {
throwableConsumer.accept(t);
return null;
},
retryInitialIntervalMillis,
retryMaxIntervalMillis,
retryMultiplier,
retryMaxRetries);
}

public static void setRetryInitialIntervalMillis(int retryInitialIntervalMillis) {
TransactionExecutor.retryInitialIntervalMillis = retryInitialIntervalMillis;
}

public static void setRetryMaxIntervalMillis(int retryMaxIntervalMillis) {
TransactionExecutor.retryMaxIntervalMillis = retryMaxIntervalMillis;
}

public static void setRetryMultiplier(int retryMultiplier) {
TransactionExecutor.retryMultiplier = retryMultiplier;
}

public static void setRetryMaxRetries(int retryMaxRetries) {
TransactionExecutor.retryMaxRetries = retryMaxRetries;
}
}
Loading

0 comments on commit 79d6efa

Please sign in to comment.