Skip to content

Commit

Permalink
history replayer uses open and closed histories
Browse files Browse the repository at this point in the history
  • Loading branch information
steveandroulakis committed Jun 17, 2024
1 parent 3877f18 commit 8abbc3d
Showing 1 changed file with 73 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.temporal.testing.TestWorkflowEnvironmentInternal;
import io.temporal.testing.WorkflowReplayer;
import io.temporal.worker.Worker;

import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.List;
Expand All @@ -42,79 +41,79 @@

public class RecentHistoryReplayer {

public static List<WorkflowExecutionHistory> getWorkflowHistories()
throws FileNotFoundException, SSLException {

WorkflowServiceStubs service = getWorkflowServiceStubs();

String query = "WorkflowType = 'moneyTransferWorkflow'";

ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
ListWorkflowExecutionsRequest.newBuilder()
.setNamespace(ServerInfo.getNamespace())
.setPageSize(5)
.setQuery(query)
.build();
ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest);

List<WorkflowExecutionHistory> histories =
listWorkflowExecutionsResponse.getExecutionsList().stream()
.map(
(info) -> {
GetWorkflowExecutionHistoryResponse weh =
service
.blockingStub()
.getWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace(ServerInfo.getNamespace())
.setExecution(info.getExecution())
.build());
return new WorkflowExecutionHistory(
weh.getHistory(), info.getExecution().getWorkflowId());
})
.collect(Collectors.toList());

return histories;
public static List<WorkflowExecutionHistory> getWorkflowHistories()
throws FileNotFoundException, SSLException {

WorkflowServiceStubs service = getWorkflowServiceStubs();

String query = "WorkflowType = 'moneyTransferWorkflow'";

ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
ListWorkflowExecutionsRequest.newBuilder()
.setNamespace(ServerInfo.getNamespace())
.setPageSize(5)
.setQuery(query)
.build();
ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest);

List<WorkflowExecutionHistory> histories =
listWorkflowExecutionsResponse.getExecutionsList().stream()
.map(
(info) -> {
GetWorkflowExecutionHistoryResponse weh =
service
.blockingStub()
.getWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace(ServerInfo.getNamespace())
.setExecution(info.getExecution())
.build());
return new WorkflowExecutionHistory(
weh.getHistory(), info.getExecution().getWorkflowId());
})
.collect(Collectors.toList());

return histories;
}

public static void main(String[] args) throws Exception {

List<WorkflowExecutionHistory> histories = getWorkflowHistories();

// Make replayer compatible with data converter
TestWorkflowEnvironmentInternal testEnv =
new TestWorkflowEnvironmentInternal(
TestEnvironmentOptions.newBuilder()
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setDataConverter(
new CodecDataConverter(
DefaultDataConverter.newDefaultInstance(),
Collections.singletonList(new CryptCodec()),
true /* encode failure attributes */))
.build())
.build());

Worker worker = testEnv.newWorker("my-task-queue");
worker.registerWorkflowImplementationTypes(AccountTransferWorkflowImpl.class);

// history length
System.out.println("Replaying " + histories.size() + " most recent workflow executions.");

for (WorkflowExecutionHistory history : histories) {
System.out.println("Replaying workflow: " + history.getWorkflowExecution().getWorkflowId());

try {
WorkflowReplayer.replayWorkflowExecution(history, worker);
System.out.println("Replay completed successfully");
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println(
"Replay failed, check above output for io.temporal.worker.NonDeterministicException");
}
}

public static void main(String[] args) throws Exception {

List<WorkflowExecutionHistory> histories = getWorkflowHistories();

// Make replayer compatible with data converter
TestWorkflowEnvironmentInternal testEnv =
new TestWorkflowEnvironmentInternal(
TestEnvironmentOptions.newBuilder()
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setDataConverter(
new CodecDataConverter(
DefaultDataConverter.newDefaultInstance(),
Collections.singletonList(new CryptCodec()),
true /* encode failure attributes */))
.build())
.build());

Worker worker = testEnv.newWorker("my-task-queue");
worker.registerWorkflowImplementationTypes(AccountTransferWorkflowImpl.class);

// history length
System.out.println("Replaying " + histories.size() + " most recent workflow executions.");

for (WorkflowExecutionHistory history : histories) {
System.out.println("Replaying workflow: " + history.getWorkflowExecution().getWorkflowId());

try {
WorkflowReplayer.replayWorkflowExecution(history, worker);
System.out.println("Replay completed successfully");
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println(
"Replay failed, check above output for io.temporal.worker.NonDeterministicException");
}
}

testEnv.close();
}
testEnv.close();
}
}

0 comments on commit 8abbc3d

Please sign in to comment.