Skip to content

Commit

Permalink
docs: workflows sample with a basic deduplication (#9)
Browse files Browse the repository at this point in the history
* docs: workflows sample with a basic deduplication

* fixing logger

* commandIds size limit
  • Loading branch information
aludwiko authored Nov 21, 2024
1 parent f4fba08 commit 0a24ac9
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 81 deletions.
2 changes: 1 addition & 1 deletion docs/src/modules/java/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ include::example$transfer-workflow/src/main/java/com/example/transfer/applicatio
<6> This time we return an effect that will stop workflow processing, by using special `end` method.
<7> We collect all steps to form a workflow definition.
IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism.
IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism. A very basic example of handling retries for workflows can be found in https://github.com/akka/akka-sdk/blob/main/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java[this] sample.
== Retrieving state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void shouldTimedOutTransferWorkflow() {
}


private String randomId() {
public static String randomId() {
return UUID.randomUUID().toString().substring(0, 8);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.example.wallet.application;

import akka.javasdk.testkit.TestKitSupport;
import com.example.wallet.application.WalletEntity.WalletResult;
import com.example.wallet.domain.WalletCommand;
import org.junit.jupiter.api.Test;

import static com.example.transfer.TransferWorkflowIntegrationTest.randomId;
import static org.assertj.core.api.Assertions.assertThat;

class WalletEntityIntegrationTest extends TestKitSupport {

@Test
public void shouldDeduplicateWithdrawCommand() {
// given
var walletId = randomId();
var withdraw = new WalletCommand.Withdraw(randomId(), 10);
await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::create)
.invokeAsync(100));

// when
withdraw(walletId, withdraw);
withdraw(walletId, withdraw);
withdraw(walletId, withdraw);

// then
Integer balance = await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::get)
.invokeAsync());
assertThat(balance).isEqualTo(100 - 10);
}

@Test
public void shouldDeduplicateDepositCommand() {
// given
var walletId = randomId();
var deposit = new WalletCommand.Deposit(randomId(), 10);
await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::create)
.invokeAsync(100));

// when
deposit(walletId, deposit);
deposit(walletId, deposit);
deposit(walletId, deposit);

// then
Integer balance = await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::get)
.invokeAsync());
assertThat(balance).isEqualTo(100 + 10);
}

private WalletResult deposit(String walletId, WalletCommand.Deposit deposit) {
return await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::deposit)
.invokeAsync(deposit));
}

private WalletResult withdraw(String walletId, WalletCommand.Withdraw withdraw) {
return await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::withdraw)
.invokeAsync(withdraw));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.example.wallet.domain;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class WalletTest {

@Test
public void shouldLimitCommandIdsSize() {
//given
Wallet wallet = new Wallet("w1", 100, new ArrayList<>());

//when
for (int i = 0; i < 10000; i++) {
List<WalletEvent> events = wallet.handle(new WalletCommand.Deposit(UUID.randomUUID().toString(), 10));
wallet = wallet.applyEvent(events.get(0));
}

//then
assertThat(wallet.commandIds()).hasSize(1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.example.wallet.application.WalletEntity.WalletResult;
import com.example.wallet.application.WalletEntity.WalletResult.Failure;
import com.example.wallet.application.WalletEntity.WalletResult.Success;
import com.example.wallet.domain.WalletCommand.Deposit;
import com.example.wallet.domain.WalletCommand.Withdraw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,20 +28,8 @@
import static java.time.Duration.ofHours;
import static java.time.Duration.ofSeconds;

// tag::class[]
@ComponentId("transfer") // <1>
public class TransferWorkflow extends Workflow<TransferState> { // <2>

public record Withdraw(String from, int amount) {
}

// end::class[]

// tag::definition[]
public record Deposit(String to, int amount) {
}

// end::definition[]
public class TransferWorkflow extends Workflow<TransferState> {

private static final Logger logger = LoggerFactory.getLogger(TransferWorkflow.class);

Expand All @@ -58,14 +48,14 @@ public WorkflowDef<TransferState> definition() {
logger.info("Running: " + cmd);
// cancelling the timer in case it was scheduled
return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ ->
componentClient.forEventSourcedEntity(cmd.from)
componentClient.forEventSourcedEntity(currentState().transfer().from())
.method(WalletEntity::withdraw)
.invokeAsync(cmd.amount));
.invokeAsync(cmd));
})
.andThen(WalletResult.class, result -> {
switch (result) {
case Success __ -> {
Deposit depositInput = new Deposit(currentState().transfer().to(), currentState().transfer().amount());
Deposit depositInput = new Deposit(currentState().depositId(), currentState().transfer().amount());
return effects()
.updateState(currentState().withStatus(WITHDRAW_SUCCEED))
.transitionTo("deposit", depositInput);
Expand All @@ -87,9 +77,9 @@ public WorkflowDef<TransferState> definition() {
// end::compensation[]
logger.info("Running: " + cmd);
// tag::compensation[]
return componentClient.forEventSourcedEntity(cmd.to)
return componentClient.forEventSourcedEntity(currentState().transfer().to())
.method(WalletEntity::deposit)
.invokeAsync(cmd.amount);
.invokeAsync(cmd);
})
.andThen(WalletResult.class, result -> { // <1>
switch (result) {
Expand All @@ -116,9 +106,13 @@ public WorkflowDef<TransferState> definition() {
logger.info("Running withdraw compensation");
// tag::compensation[]
var transfer = currentState().transfer();
// end::compensation[]
// depositId is reused for the compensation, just to have a stable commandId and simplify the example
// tag::compensation[]
String commandId = currentState().depositId();
return componentClient.forEventSourcedEntity(transfer.from())
.method(WalletEntity::deposit)
.invokeAsync(transfer.amount());
.invokeAsync(new Deposit(commandId, transfer.amount()));
})
.andThen(WalletResult.class, result -> {
switch (result) {
Expand Down Expand Up @@ -185,29 +179,30 @@ public WorkflowDef<TransferState> definition() {
.addStep(failoverHandler);
}



public Effect<String> startTransfer(Transfer transfer) {
if (currentState() != null) {
return effects().error("transfer already started");
} else if (transfer.amount() <= 0) {
return effects().error("transfer amount should be greater than zero");
} else if (transfer.amount() > 1000) {
logger.info("Waiting for acceptation: " + transfer);
TransferState waitingForAcceptationState = new TransferState(commandContext().workflowId(), transfer)
.withStatus(WAITING_FOR_ACCEPTATION);
return effects()
.updateState(waitingForAcceptationState)
.transitionTo("wait-for-acceptation")
.thenReply("transfer started, waiting for acceptation");
} else {
logger.info("Running: " + transfer);
TransferState initialState = new TransferState(commandContext().workflowId(), transfer);
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
return effects()
.updateState(initialState)
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer started");
String workflowId = commandContext().workflowId();
if (transfer.amount() > 1000) {
logger.info("Waiting for acceptation: " + transfer);
TransferState waitingForAcceptationState = TransferState.create(workflowId, transfer)
.withStatus(WAITING_FOR_ACCEPTATION);
return effects()
.updateState(waitingForAcceptationState)
.transitionTo("wait-for-acceptation")
.thenReply("transfer started, waiting for acceptation");
} else {
logger.info("Running: " + transfer);
TransferState initialState = TransferState.create(workflowId, transfer);
Withdraw withdrawInput = new Withdraw(initialState.withdrawId(), transfer.amount());
return effects()
.updateState(initialState)
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer started");
}
}
}

Expand All @@ -234,7 +229,7 @@ public Effect<String> accept() {
// end::resuming[]
logger.info("Accepting transfer: " + transfer);
// tag::resuming[]
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
Withdraw withdrawInput = new Withdraw(currentState().withdrawId(), transfer.amount());
return effects()
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer accepted");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.example.transfer.domain;

import static com.example.transfer.domain.TransferState.TransferStatus.*;
import java.util.UUID;

public record TransferState(String transferId, Transfer transfer, TransferStatus status) {
import static com.example.transfer.domain.TransferState.TransferStatus.STARTED;

public record TransferState(String transferId, Transfer transfer, TransferStatus status, String withdrawId,
String depositId) {

public record Transfer(String from, String to, int amount) {
}
Expand All @@ -11,11 +14,14 @@ public enum TransferStatus {
STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, WAITING_FOR_ACCEPTATION, TRANSFER_ACCEPTATION_TIMED_OUT, REQUIRES_MANUAL_INTERVENTION
}

public TransferState(String transferId, Transfer transfer) {
this(transferId, transfer, STARTED);
public static TransferState create(String transferId, Transfer transfer) {
// commandIds must be the same for every attempt, that's why we keep them as a part of the state
String withdrawId = UUID.randomUUID().toString();
String depositId = UUID.randomUUID().toString();
return new TransferState(transferId, transfer, STARTED, withdrawId, depositId);
}

public TransferState withStatus(TransferStatus newStatus) {
return new TransferState(transferId, transfer, newStatus);
return new TransferState(transferId, transfer, newStatus, withdrawId, depositId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import com.example.wallet.application.WalletEntity;
import com.example.wallet.application.WalletEntity.WalletResult.Failure;
import com.example.wallet.application.WalletEntity.WalletResult.Success;
import com.example.wallet.domain.WalletCommand.Deposit;
import com.example.wallet.domain.WalletCommand.Withdraw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.CompletionStage;

// Opened up for access from the public internet to make the sample service easy to try out.
Expand Down Expand Up @@ -48,7 +51,7 @@ public CompletionStage<HttpResponse> create(String id, int initialAmount) {
@Post("/{id}/deposit/{amount}")
public CompletionStage<HttpResponse> deposit(String id, int amount) {
return componentClient.forEventSourcedEntity(id)
.method(WalletEntity::deposit).invokeAsync(amount)
.method(WalletEntity::deposit).invokeAsync(new Deposit(UUID.randomUUID().toString(), amount))
.thenApply(walletResult ->
switch (walletResult) {
case Success __ -> HttpResponses.ok();
Expand All @@ -63,7 +66,7 @@ public CompletionStage<HttpResponse> deposit(String id, int amount) {
@Post("/{id}/withdraw/{amount}")
public CompletionStage<HttpResponse> withdraw(String id, int amount) {
return componentClient.forEventSourcedEntity(id)
.method(WalletEntity::withdraw).invokeAsync(amount)
.method(WalletEntity::withdraw).invokeAsync(new Withdraw(UUID.randomUUID().toString(), amount))
.thenApply(walletResult ->
switch (walletResult) {
case Success __ -> HttpResponses.ok();
Expand Down
Loading

0 comments on commit 0a24ac9

Please sign in to comment.