From f77eff669d58b78bef27b3de5e0b9b4b0dac473f Mon Sep 17 00:00:00 2001 From: Hiroyuki Yamada Date: Wed, 4 Dec 2024 16:23:40 +0900 Subject: [PATCH] Backport to branch(3) : Add executeWithRetries methods to TransactionExecutor (#2387) Co-authored-by: Toshihiro Suzuki --- .../scalar/db/common/TransactionExecutor.java | 126 ++++++++-- .../db/common/TransactionExecutorTest.java | 223 ++++++++++++++---- 2 files changed, 286 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/TransactionExecutor.java b/core/src/main/java/com/scalar/db/common/TransactionExecutor.java index af931837d2..1d687976ba 100644 --- a/core/src/main/java/com/scalar/db/common/TransactionExecutor.java +++ b/core/src/main/java/com/scalar/db/common/TransactionExecutor.java @@ -1,13 +1,18 @@ package com.scalar.db.common; +import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.CrudOperable; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionUtils; import com.scalar.db.util.ThrowableConsumer; import com.scalar.db.util.ThrowableFunction; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +20,11 @@ public final class TransactionExecutor { private static final Logger logger = LoggerFactory.getLogger(TransactionExecutor.class); + private static final int DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS = 100; + private static final int DEFAULT_RETRY_MAX_INTERVAL_MILLIS = 1000; + private static final int DEFAULT_RETRY_MULTIPLIER = 2; + private static final int DEFAULT_RETRY_MAX_RETRIES = 5; + private TransactionExecutor() {} public static T execute( @@ -32,6 +42,9 @@ public static T execute( T result = throwableFunction.apply(transaction); transaction.commit(); return result; + } catch (UnknownTransactionStatusException e) { + // We don't need to rollback the transaction for UnknownTransactionStatusException + throw e; } catch (Exception e) { if (transaction != null) { rollback(transaction); @@ -43,26 +56,14 @@ public static T execute( public static void execute( DistributedTransactionManager transactionManager, - ThrowableConsumer, TransactionException> throwableFunction) + ThrowableConsumer, TransactionException> throwableConsumer) throws TransactionException { - if (SingleCrudOperationTransactionUtils.isSingleCrudOperationTransactionManager( - transactionManager)) { - throwableFunction.accept(transactionManager); - return; - } - - DistributedTransaction transaction = null; - try { - transaction = transactionManager.begin(); - throwableFunction.accept(transaction); - transaction.commit(); - } catch (Exception e) { - if (transaction != null) { - rollback(transaction); - } - - throw e; - } + execute( + transactionManager, + t -> { + throwableConsumer.accept(t); + return null; + }); } private static void rollback(DistributedTransaction transaction) { @@ -72,4 +73,91 @@ private static void rollback(DistributedTransaction transaction) { logger.warn("Failed to rollback a transaction", e); } } + + public static T executeWithRetries( + DistributedTransactionManager transactionManager, + ThrowableFunction, T, TransactionException> throwableFunction) + throws TransactionException { + return executeWithRetries( + transactionManager, + throwableFunction, + DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS, + DEFAULT_RETRY_MAX_INTERVAL_MILLIS, + DEFAULT_RETRY_MULTIPLIER, + DEFAULT_RETRY_MAX_RETRIES); + } + + public static T executeWithRetries( + DistributedTransactionManager transactionManager, + ThrowableFunction, T, TransactionException> throwableFunction, + int retryInitialIntervalMillis, + int retryMaxIntervalMillis, + int retryMultiplier, + int retryMaxRetries) + throws TransactionException { + TransactionException lastException; + int interval = retryInitialIntervalMillis; + int attempt = 0; + while (true) { + try { + return execute(transactionManager, throwableFunction); + } catch (CrudConflictException | CommitConflictException e) { + // Retry the transaction for the conflict exceptions + lastException = e; + } + + if (attempt++ >= retryMaxRetries) { + break; + } + + logger.warn( + "The transaction failed. Retrying after {} milliseconds... The current attempt count: {}.", + interval, + attempt, + lastException); + + Uninterruptibles.sleepUninterruptibly(interval, TimeUnit.MILLISECONDS); + + interval *= retryMultiplier; + if (interval > retryMaxIntervalMillis) { + interval = retryMaxIntervalMillis; + } + } + + logger.error("The transaction failed after {} retries.", retryMaxRetries, lastException); + throw lastException; + } + + public static void executeWithRetries( + DistributedTransactionManager transactionManager, + ThrowableConsumer, TransactionException> throwableConsumer) + throws TransactionException { + executeWithRetries( + transactionManager, + throwableConsumer, + DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS, + DEFAULT_RETRY_MAX_INTERVAL_MILLIS, + DEFAULT_RETRY_MULTIPLIER, + DEFAULT_RETRY_MAX_RETRIES); + } + + public static void executeWithRetries( + DistributedTransactionManager transactionManager, + ThrowableConsumer, TransactionException> throwableConsumer, + int retryInitialIntervalMillis, + int retryMaxIntervalMillis, + int retryMultiplier, + int retryMaxRetries) + throws TransactionException { + executeWithRetries( + transactionManager, + t -> { + throwableConsumer.accept(t); + return null; + }, + retryInitialIntervalMillis, + retryMaxIntervalMillis, + retryMultiplier, + retryMaxRetries); + } } diff --git a/core/src/test/java/com/scalar/db/common/TransactionExecutorTest.java b/core/src/test/java/com/scalar/db/common/TransactionExecutorTest.java index cbd77a82ff..637a1f1160 100644 --- a/core/src/test/java/com/scalar/db/common/TransactionExecutorTest.java +++ b/core/src/test/java/com/scalar/db/common/TransactionExecutorTest.java @@ -2,24 +2,35 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.scalar.db.api.CrudOperable; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import com.scalar.db.util.ThrowableConsumer; import com.scalar.db.util.ThrowableFunction; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; public class TransactionExecutorTest { @Test - public void execute_WithSingleCrudOperationTransactionManager_ShouldExecuteFunction() - throws TransactionException { + public void + execute_ThrowableFunctionGiven_WithSingleCrudOperationTransactionManager_ShouldExecuteFunction() + throws TransactionException { // Arrange DistributedTransactionManager transactionManager = mock(SingleCrudOperationTransactionManager.class); @@ -39,23 +50,9 @@ public void execute_WithSingleCrudOperationTransactionManager_ShouldExecuteFunct } @Test - public void execute2_WithSingleCrudOperationTransactionManager_ShouldExecuteFunction() - throws TransactionException { - // Arrange - DistributedTransactionManager transactionManager = - mock(SingleCrudOperationTransactionManager.class); - - // Act Assert - TransactionExecutor.execute( - transactionManager, - t -> { - assertThat(t).isEqualTo(transactionManager); - }); - } - - @Test - public void execute_WithDistributedTransactionManager_ShouldExecuteFunction() - throws TransactionException { + public void + execute_ThrowableFunctionGiven_WithDistributedTransactionManager_ShouldExecuteFunction() + throws TransactionException { // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); @@ -79,29 +76,9 @@ public void execute_WithDistributedTransactionManager_ShouldExecuteFunction() verify(transaction).commit(); } - @Test - public void execute2_WithDistributedTransactionManager_ShouldExecuteFunction() - throws TransactionException { - // Arrange - DistributedTransaction transaction = mock(DistributedTransaction.class); - - DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); - when(transactionManager.begin()).thenReturn(transaction); - - // Act Assert - TransactionExecutor.execute( - transactionManager, - t -> { - assertThat(t).isEqualTo(transaction); - }); - - verify(transactionManager).begin(); - verify(transaction).commit(); - } - @Test public void - execute_WithDistributedTransactionManager_TransactionExceptionThrown_ShouldThrowTransactionException() + execute_ThrowableFunctionGiven_WithDistributedTransactionManager_TransactionExceptionThrown_ShouldThrowTransactionExceptionWithRollBack() throws TransactionException { // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); @@ -128,7 +105,7 @@ public void execute2_WithDistributedTransactionManager_ShouldExecuteFunction() @Test public void - execute2_WithDistributedTransactionManager_TransactionExceptionThrown_ShouldThrowTransactionException() + execute_ThrowableFunctionGiven_WithDistributedTransactionManager_UnknownTransactionStatusExceptionThrown_ShouldThrowTransactionExceptionWithoutRollBack() throws TransactionException { // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); @@ -136,20 +113,178 @@ public void execute2_WithDistributedTransactionManager_ShouldExecuteFunction() DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); when(transactionManager.begin()).thenReturn(transaction); - TransactionException exception = mock(TransactionException.class); + UnknownTransactionStatusException exception = mock(UnknownTransactionStatusException.class); // Act Assert assertThatThrownBy( () -> TransactionExecutor.execute( transactionManager, - (ThrowableConsumer, TransactionException>) + (ThrowableFunction, Object, TransactionException>) t -> { throw exception; })) .isEqualTo(exception); verify(transactionManager).begin(); - verify(transaction).rollback(); + verify(transaction, never()).rollback(); + } + + @SuppressWarnings("unchecked") + @Test + public void execute_ThrowableConsumerGiven_ShouldCallExecuteWithFunction() + throws TransactionException { + try (MockedStatic mocked = + mockStatic(TransactionExecutor.class, CALLS_REAL_METHODS)) { + // Arrange + DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); + ThrowableConsumer, TransactionException> throwableConsumer = + mock(ThrowableConsumer.class); + + mocked + .when( + () -> + TransactionExecutor.execute(eq(transactionManager), any(ThrowableFunction.class))) + .thenReturn(null); + + // Act + TransactionExecutor.execute(transactionManager, throwableConsumer); + + // Assert + mocked.verify( + () -> TransactionExecutor.execute(eq(transactionManager), any(ThrowableFunction.class))); + } + } + + @SuppressWarnings("unchecked") + @Test + public void executeWithRetries_ThrowableFunctionGiven_ShouldReturnResult() + throws TransactionException { + try (MockedStatic mocked = + mockStatic(TransactionExecutor.class, CALLS_REAL_METHODS)) { + // Arrange + DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); + ThrowableFunction, Object, TransactionException> throwableFunction = + mock(ThrowableFunction.class); + + Object expected = new Object(); + + mocked + .when(() -> TransactionExecutor.execute(transactionManager, throwableFunction)) + .thenReturn(expected); + + // Act + Object actual = TransactionExecutor.executeWithRetries(transactionManager, throwableFunction); + + // Assert + mocked.verify(() -> TransactionExecutor.execute(transactionManager, throwableFunction)); + + assertThat(actual).isEqualTo(expected); + } + } + + @SuppressWarnings("unchecked") + @Test + public void + executeWithRetries_ThrowableFunctionGiven_CrudConflictExceptionThrownTwoTimesThenReturnResult_ShouldRetryAndReturnResult() + throws TransactionException { + try (MockedStatic mocked = + mockStatic(TransactionExecutor.class, CALLS_REAL_METHODS)) { + // Arrange + DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); + ThrowableFunction, Object, TransactionException> throwableFunction = + mock(ThrowableFunction.class); + + CrudConflictException exception = mock(CrudConflictException.class); + when(exception.getMessage()).thenReturn("message"); + + Object expected = new Object(); + + mocked + .when(() -> TransactionExecutor.execute(transactionManager, throwableFunction)) + .thenThrow(exception) + .thenThrow(exception) + .thenReturn(expected); + + // Act Assert + Object actual = TransactionExecutor.executeWithRetries(transactionManager, throwableFunction); + + mocked.verify( + () -> TransactionExecutor.execute(transactionManager, throwableFunction), times(3)); + + assertThat(actual).isEqualTo(expected); + } + } + + @SuppressWarnings("unchecked") + @Test + public void + executeWithRetries_ThrowableFunctionGiven_CommitConflictExceptionThrownTwoTimesThenReturnResult_ShouldRetryAndReturnResult() + throws TransactionException { + try (MockedStatic mocked = + mockStatic(TransactionExecutor.class, CALLS_REAL_METHODS)) { + // Arrange + DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); + ThrowableFunction, Object, TransactionException> throwableFunction = + mock(ThrowableFunction.class); + + CrudConflictException exception = mock(CrudConflictException.class); + when(exception.getMessage()).thenReturn("message"); + + Object expected = new Object(); + + mocked + .when(() -> TransactionExecutor.execute(transactionManager, throwableFunction)) + .thenThrow(exception) + .thenThrow(exception) + .thenReturn(expected); + + // Act Assert + Object actual = TransactionExecutor.executeWithRetries(transactionManager, throwableFunction); + + mocked.verify( + () -> TransactionExecutor.execute(transactionManager, throwableFunction), times(3)); + + assertThat(actual).isEqualTo(expected); + } + } + + @SuppressWarnings("unchecked") + @Test + public void + executeWithRetries_ThrowableConsumerGiven_CommitConflictExceptionThrownTwoTimesThenReturnResult_ShouldRetryAndReturnResult() + throws TransactionException { + try (MockedStatic mocked = + mockStatic(TransactionExecutor.class, CALLS_REAL_METHODS)) { + // Arrange + DistributedTransactionManager transactionManager = mock(DistributedTransactionManager.class); + ThrowableConsumer, TransactionException> throwableConsumer = + mock(ThrowableConsumer.class); + + mocked + .when( + () -> + TransactionExecutor.executeWithRetries( + eq(transactionManager), + any(ThrowableFunction.class), + anyInt(), + anyInt(), + anyInt(), + anyInt())) + .thenReturn(null); + + // Act Assert + TransactionExecutor.executeWithRetries(transactionManager, throwableConsumer); + + mocked.verify( + () -> + TransactionExecutor.executeWithRetries( + eq(transactionManager), + any(ThrowableFunction.class), + anyInt(), + anyInt(), + anyInt(), + anyInt())); + } } }