diff --git a/docs/src/modules/java/pages/workflows.adoc b/docs/src/modules/java/pages/workflows.adoc index f083ff6da..cade3075e 100644 --- a/docs/src/modules/java/pages/workflows.adoc +++ b/docs/src/modules/java/pages/workflows.adoc @@ -117,7 +117,7 @@ include::example$transfer-workflow/src/main/java/com/example/transfer/applicatio <6> This time we return an effect that will stop workflow processing, by using special `end` method. <7> We collect all steps to form a workflow definition. -IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism. +IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism. A very basic example of handling retries for workflows can be found in https://github.com/akka/akka-sdk/blob/main/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java[this] sample. == Retrieving state diff --git a/samples/transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java b/samples/transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java index 38454bed1..4dc510b04 100644 --- a/samples/transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java +++ b/samples/transfer-workflow-compensation/src/it/java/com/example/transfer/TransferWorkflowIntegrationTest.java @@ -164,7 +164,7 @@ public void shouldTimedOutTransferWorkflow() { } - private String randomId() { + public static String randomId() { return UUID.randomUUID().toString().substring(0, 8); } diff --git a/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/application/WalletEntityIntegrationTest.java b/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/application/WalletEntityIntegrationTest.java new file mode 100644 index 000000000..63fa529c8 --- /dev/null +++ b/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/application/WalletEntityIntegrationTest.java @@ -0,0 +1,66 @@ +package com.example.wallet.application; + +import akka.javasdk.testkit.TestKitSupport; +import com.example.wallet.application.WalletEntity.WalletResult; +import com.example.wallet.domain.WalletCommand; +import org.junit.jupiter.api.Test; + +import static com.example.transfer.TransferWorkflowIntegrationTest.randomId; +import static org.assertj.core.api.Assertions.assertThat; + +class WalletEntityIntegrationTest extends TestKitSupport { + + @Test + public void shouldDeduplicateWithdrawCommand() { + // given + var walletId = randomId(); + var withdraw = new WalletCommand.Withdraw(randomId(), 10); + await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::create) + .invokeAsync(100)); + + // when + withdraw(walletId, withdraw); + withdraw(walletId, withdraw); + withdraw(walletId, withdraw); + + // then + Integer balance = await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::get) + .invokeAsync()); + assertThat(balance).isEqualTo(100 - 10); + } + + @Test + public void shouldDeduplicateDepositCommand() { + // given + var walletId = randomId(); + var deposit = new WalletCommand.Deposit(randomId(), 10); + await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::create) + .invokeAsync(100)); + + // when + deposit(walletId, deposit); + deposit(walletId, deposit); + deposit(walletId, deposit); + + // then + Integer balance = await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::get) + .invokeAsync()); + assertThat(balance).isEqualTo(100 + 10); + } + + private WalletResult deposit(String walletId, WalletCommand.Deposit deposit) { + return await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::deposit) + .invokeAsync(deposit)); + } + + private WalletResult withdraw(String walletId, WalletCommand.Withdraw withdraw) { + return await(componentClient.forEventSourcedEntity(walletId) + .method(WalletEntity::withdraw) + .invokeAsync(withdraw)); + } +} \ No newline at end of file diff --git a/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/domain/WalletTest.java b/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/domain/WalletTest.java new file mode 100644 index 000000000..22c14c773 --- /dev/null +++ b/samples/transfer-workflow-compensation/src/it/java/com/example/wallet/domain/WalletTest.java @@ -0,0 +1,27 @@ +package com.example.wallet.domain; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +class WalletTest { + + @Test + public void shouldLimitCommandIdsSize() { + //given + Wallet wallet = new Wallet("w1", 100, new ArrayList<>()); + + //when + for (int i = 0; i < 10000; i++) { + List events = wallet.handle(new WalletCommand.Deposit(UUID.randomUUID().toString(), 10)); + wallet = wallet.applyEvent(events.get(0)); + } + + //then + assertThat(wallet.commandIds()).hasSize(1000); + } +} \ No newline at end of file diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java index 5ffa32f31..574e54fea 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java @@ -10,6 +10,8 @@ import com.example.wallet.application.WalletEntity.WalletResult; import com.example.wallet.application.WalletEntity.WalletResult.Failure; import com.example.wallet.application.WalletEntity.WalletResult.Success; +import com.example.wallet.domain.WalletCommand.Deposit; +import com.example.wallet.domain.WalletCommand.Withdraw; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,20 +28,8 @@ import static java.time.Duration.ofHours; import static java.time.Duration.ofSeconds; -// tag::class[] @ComponentId("transfer") // <1> -public class TransferWorkflow extends Workflow { // <2> - - public record Withdraw(String from, int amount) { - } - - // end::class[] - - // tag::definition[] - public record Deposit(String to, int amount) { - } - - // end::definition[] +public class TransferWorkflow extends Workflow { private static final Logger logger = LoggerFactory.getLogger(TransferWorkflow.class); @@ -58,14 +48,14 @@ public WorkflowDef definition() { logger.info("Running: " + cmd); // cancelling the timer in case it was scheduled return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ -> - componentClient.forEventSourcedEntity(cmd.from) + componentClient.forEventSourcedEntity(currentState().transfer().from()) .method(WalletEntity::withdraw) - .invokeAsync(cmd.amount)); + .invokeAsync(cmd)); }) .andThen(WalletResult.class, result -> { switch (result) { case Success __ -> { - Deposit depositInput = new Deposit(currentState().transfer().to(), currentState().transfer().amount()); + Deposit depositInput = new Deposit(currentState().depositId(), currentState().transfer().amount()); return effects() .updateState(currentState().withStatus(WITHDRAW_SUCCEED)) .transitionTo("deposit", depositInput); @@ -87,9 +77,9 @@ public WorkflowDef definition() { // end::compensation[] logger.info("Running: " + cmd); // tag::compensation[] - return componentClient.forEventSourcedEntity(cmd.to) + return componentClient.forEventSourcedEntity(currentState().transfer().to()) .method(WalletEntity::deposit) - .invokeAsync(cmd.amount); + .invokeAsync(cmd); }) .andThen(WalletResult.class, result -> { // <1> switch (result) { @@ -116,9 +106,13 @@ public WorkflowDef definition() { logger.info("Running withdraw compensation"); // tag::compensation[] var transfer = currentState().transfer(); + // end::compensation[] + // depositId is reused for the compensation, just to have a stable commandId and simplify the example + // tag::compensation[] + String commandId = currentState().depositId(); return componentClient.forEventSourcedEntity(transfer.from()) .method(WalletEntity::deposit) - .invokeAsync(transfer.amount()); + .invokeAsync(new Deposit(commandId, transfer.amount())); }) .andThen(WalletResult.class, result -> { switch (result) { @@ -185,29 +179,30 @@ public WorkflowDef definition() { .addStep(failoverHandler); } - - public Effect startTransfer(Transfer transfer) { if (currentState() != null) { return effects().error("transfer already started"); } else if (transfer.amount() <= 0) { return effects().error("transfer amount should be greater than zero"); - } else if (transfer.amount() > 1000) { - logger.info("Waiting for acceptation: " + transfer); - TransferState waitingForAcceptationState = new TransferState(commandContext().workflowId(), transfer) - .withStatus(WAITING_FOR_ACCEPTATION); - return effects() - .updateState(waitingForAcceptationState) - .transitionTo("wait-for-acceptation") - .thenReply("transfer started, waiting for acceptation"); } else { - logger.info("Running: " + transfer); - TransferState initialState = new TransferState(commandContext().workflowId(), transfer); - Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount()); - return effects() - .updateState(initialState) - .transitionTo("withdraw", withdrawInput) - .thenReply("transfer started"); + String workflowId = commandContext().workflowId(); + if (transfer.amount() > 1000) { + logger.info("Waiting for acceptation: " + transfer); + TransferState waitingForAcceptationState = TransferState.create(workflowId, transfer) + .withStatus(WAITING_FOR_ACCEPTATION); + return effects() + .updateState(waitingForAcceptationState) + .transitionTo("wait-for-acceptation") + .thenReply("transfer started, waiting for acceptation"); + } else { + logger.info("Running: " + transfer); + TransferState initialState = TransferState.create(workflowId, transfer); + Withdraw withdrawInput = new Withdraw(initialState.withdrawId(), transfer.amount()); + return effects() + .updateState(initialState) + .transitionTo("withdraw", withdrawInput) + .thenReply("transfer started"); + } } } @@ -234,7 +229,7 @@ public Effect accept() { // end::resuming[] logger.info("Accepting transfer: " + transfer); // tag::resuming[] - Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount()); + Withdraw withdrawInput = new Withdraw(currentState().withdrawId(), transfer.amount()); return effects() .transitionTo("withdraw", withdrawInput) .thenReply("transfer accepted"); diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/domain/TransferState.java b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/domain/TransferState.java index 92e6ad525..8afdcd63a 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/domain/TransferState.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/domain/TransferState.java @@ -1,8 +1,11 @@ package com.example.transfer.domain; -import static com.example.transfer.domain.TransferState.TransferStatus.*; +import java.util.UUID; -public record TransferState(String transferId, Transfer transfer, TransferStatus status) { +import static com.example.transfer.domain.TransferState.TransferStatus.STARTED; + +public record TransferState(String transferId, Transfer transfer, TransferStatus status, String withdrawId, + String depositId) { public record Transfer(String from, String to, int amount) { } @@ -11,11 +14,14 @@ public enum TransferStatus { STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, WAITING_FOR_ACCEPTATION, TRANSFER_ACCEPTATION_TIMED_OUT, REQUIRES_MANUAL_INTERVENTION } - public TransferState(String transferId, Transfer transfer) { - this(transferId, transfer, STARTED); + public static TransferState create(String transferId, Transfer transfer) { + // commandIds must be the same for every attempt, that's why we keep them as a part of the state + String withdrawId = UUID.randomUUID().toString(); + String depositId = UUID.randomUUID().toString(); + return new TransferState(transferId, transfer, STARTED, withdrawId, depositId); } public TransferState withStatus(TransferStatus newStatus) { - return new TransferState(transferId, transfer, newStatus); + return new TransferState(transferId, transfer, newStatus, withdrawId, depositId); } } diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/api/WalletEndpoint.java b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/api/WalletEndpoint.java index 6e894c42d..3094ba8e7 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/api/WalletEndpoint.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/api/WalletEndpoint.java @@ -10,9 +10,12 @@ import com.example.wallet.application.WalletEntity; import com.example.wallet.application.WalletEntity.WalletResult.Failure; import com.example.wallet.application.WalletEntity.WalletResult.Success; +import com.example.wallet.domain.WalletCommand.Deposit; +import com.example.wallet.domain.WalletCommand.Withdraw; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; import java.util.concurrent.CompletionStage; // Opened up for access from the public internet to make the sample service easy to try out. @@ -48,7 +51,7 @@ public CompletionStage create(String id, int initialAmount) { @Post("/{id}/deposit/{amount}") public CompletionStage deposit(String id, int amount) { return componentClient.forEventSourcedEntity(id) - .method(WalletEntity::deposit).invokeAsync(amount) + .method(WalletEntity::deposit).invokeAsync(new Deposit(UUID.randomUUID().toString(), amount)) .thenApply(walletResult -> switch (walletResult) { case Success __ -> HttpResponses.ok(); @@ -63,7 +66,7 @@ public CompletionStage deposit(String id, int amount) { @Post("/{id}/withdraw/{amount}") public CompletionStage withdraw(String id, int amount) { return componentClient.forEventSourcedEntity(id) - .method(WalletEntity::withdraw).invokeAsync(amount) + .method(WalletEntity::withdraw).invokeAsync(new Withdraw(UUID.randomUUID().toString(), amount)) .thenApply(walletResult -> switch (walletResult) { case Success __ -> HttpResponses.ok(); diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/application/WalletEntity.java b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/application/WalletEntity.java index 55cba84d8..b6bba7df8 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/application/WalletEntity.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/application/WalletEntity.java @@ -6,28 +6,30 @@ import com.example.wallet.application.WalletEntity.WalletResult.Failure; import com.example.wallet.application.WalletEntity.WalletResult.Success; import com.example.wallet.domain.Wallet; +import com.example.wallet.domain.WalletCommand; import com.example.wallet.domain.WalletEvent; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + import static akka.Done.done; -// tag::wallet[] @ComponentId("wallet") public class WalletEntity extends EventSourcedEntity { - // end::wallet[] private static final Logger logger = LoggerFactory.getLogger(WalletEntity.class); + @Override + public Wallet emptyState() { + return Wallet.EMPTY; + } + @Override public Wallet applyEvent(WalletEvent event) { - return switch(event) { - case WalletEvent.Created c -> new Wallet(eventContext().entityId(), c.initialBalance()); - case WalletEvent.Withdrawn w -> currentState().withdraw(w.amount()); - case WalletEvent.Deposited d -> currentState().deposit(d.amount()); - }; + return currentState().applyEvent(event); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @@ -42,50 +44,44 @@ record Success() implements WalletResult { } } - // tag::wallet[] public Effect create(int initialBalance) { // <1> - if (currentState() != null){ + if (!currentState().isEmpty()){ return effects().error("Wallet already exists"); } else { - return effects().persist(new WalletEvent.Created(initialBalance)) + return effects().persist(new WalletEvent.Created(commandContext().entityId(), initialBalance)) .thenReply(__ -> done()); } } - public Effect withdraw(int amount) { // <2> - if (currentState() == null){ + public Effect withdraw(WalletCommand.Withdraw withdraw) { // <2> + if (currentState().isEmpty()){ return effects().error("Wallet does not exist"); - } else if (currentState().balance() < amount) { + } else if (currentState().balance() < withdraw.amount()) { return effects().reply(new Failure("Insufficient balance")); } else { - // end::wallet[] - logger.info("Withdraw walletId: [{}] amount -{}", currentState().id(), amount); - // tag::wallet[] - return effects().persist(new WalletEvent.Withdrawn(amount)) + logger.info("Withdraw walletId: [{}] amount -{}", currentState().id(), withdraw.amount()); + List events = currentState().handle(withdraw); + return effects().persistAll(events) .thenReply(__ -> new WalletResult.Success()); } } - public Effect deposit(int amount) { // <3> - if (currentState() == null){ + public Effect deposit(WalletCommand.Deposit deposit) { // <3> + if (currentState().isEmpty()){ return effects().error("Wallet does not exist"); - } else if (currentState() == null) { - return effects().reply(new Failure("Wallet [" + commandContext().entityId() + "] not exists")); } else { - // end::wallet[] - logger.info("Deposit walletId: [{}] amount +{}", currentState().id(), amount); - // tag::wallet[] - return effects().persist(new WalletEvent.Deposited(amount)) + logger.info("Deposit walletId: [{}] amount +{}", currentState().id(), deposit.amount()); + List events = currentState().handle(deposit); + return effects().persistAll(events) .thenReply(__ -> new WalletResult.Success()); } } public Effect get() { // <4> - if (currentState() == null){ + if (currentState().isEmpty()){ return effects().error("Wallet does not exist"); } else { return effects().reply(currentState().balance()); } } } -// end::wallet[] diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java index 2a0a47b0d..5026dab19 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java @@ -1,12 +1,56 @@ package com.example.wallet.domain; -public record Wallet(String id, int balance) { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - public Wallet withdraw(int amount) { - return new Wallet(id, balance - amount); +import java.util.ArrayList; +import java.util.List; + +public record Wallet(String id, int balance, List commandIds) { + + private static final Logger logger = LoggerFactory.getLogger(Wallet.class); + public static final int COMMAND_IDS_MAX_SIZE = 1000; + public static Wallet EMPTY = new Wallet("", 0, new ArrayList<>()); + + public boolean isEmpty(){ + return id.equals(""); + } + + public List handle(WalletCommand command) { + if (commandIds.contains(command.commandId())) { + logger.info("Command already processed: [{}]", command.commandId()); + return List.of(); + } + return switch (command) { + case WalletCommand.Deposit deposit -> + List.of(new WalletEvent.Deposited(command.commandId(), deposit.amount())); + case WalletCommand.Withdraw withdraw -> + List.of(new WalletEvent.Withdrawn(command.commandId(), withdraw.amount())); + }; + } + + + public Wallet applyEvent(WalletEvent event) { + return switch (event) { + case WalletEvent.Created created -> + new Wallet(created.walletId(), created.initialBalance(), new ArrayList<>()); + case WalletEvent.Withdrawn withdrawn -> + new Wallet(id, balance - withdrawn.amount(), addCommandId(withdrawn.commandId())); + case WalletEvent.Deposited deposited -> + new Wallet(id, balance + deposited.amount(), addCommandId(deposited.commandId())); + }; } - public Wallet deposit(int amount) { - return new Wallet(id, balance + amount); + private List addCommandId(String commandId) { + // To avoid infinite growth of the list with limit the size to 1000. + // This implementation is not very efficient, so you might want to use a more dedicated data structure for it. + // When using other collections, make sure that the state is serializable and deserializable. + // Another way to put some constraints on the list size is to remove commandIds based on time + // e.g. remove commandIds that are older than 1 hour. + if (commandIds.size() >= COMMAND_IDS_MAX_SIZE) { + commandIds.removeFirst(); + } + commandIds.add(commandId); + return commandIds; } } \ No newline at end of file diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletCommand.java b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletCommand.java new file mode 100644 index 000000000..2832fa6b7 --- /dev/null +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletCommand.java @@ -0,0 +1,11 @@ +package com.example.wallet.domain; + +public sealed interface WalletCommand { + + String commandId(); + + record Withdraw(String commandId, int amount) implements WalletCommand { + } + record Deposit(String commandId, int amount) implements WalletCommand { + } +} diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletEvent.java b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletEvent.java index 145ff34a0..ad6fbd560 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletEvent.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/WalletEvent.java @@ -5,15 +5,15 @@ public sealed interface WalletEvent { @TypeName("created") - record Created(int initialBalance) implements WalletEvent { + record Created(String walletId, int initialBalance) implements WalletEvent { } @TypeName("withdrawn") - record Withdrawn(int amount) implements WalletEvent { + record Withdrawn(String commandId, int amount) implements WalletEvent { } @TypeName("deposited") - record Deposited(int amount) implements WalletEvent { + record Deposited(String commandId, int amount) implements WalletEvent { } }