From 7a23c04096758fdcb5a73fcab8fef2fe96840bdc Mon Sep 17 00:00:00 2001 From: Nate Mortensen Date: Tue, 22 Oct 2024 14:53:27 -0700 Subject: [PATCH] Refactor Test environment initialization to CadenceTestRule from WorkflowTest. WorkflowTest is currently 6,000 lines long and has nearly every test related to end to end client behavior. It provides the rather neat behavior that it supports running against both an instance of Cadence running in Docker and against the test version. It's additionally parameterized to run the entire test suite with or without sticky execution enabled. Due to the complexity in handling both environments, adding yet another test to WorkflowTest has always been the easiest option for developers. To allow for tests to easily be split into other files, extract the core functionality to a Junit test rule that can easily be reused by additional tests. With the exception of testSignalCrossDomainExternalWorkflow and the replay tests that don't use the test environment, all tests have been left in WorkflowTest to be split out later. --- .../com/uber/cadence/RegisterTestDomain.java | 4 +- .../cadence/testUtils/CadenceTestContext.java | 224 ++++++ .../cadence/testUtils/CadenceTestRule.java | 289 +++++++ .../testUtils/RequiresDockerService.java | 10 + .../testUtils/RequiresTestService.java | 10 + .../cadence/testUtils/TestEnvironment.java | 24 + .../worker/CleanWorkerShutdownTest.java | 2 +- .../uber/cadence/worker/StickyWorkerTest.java | 2 +- .../cadence/worker/WorkerStressTests.java | 2 +- .../workflow/CrossDomainWorkflowTest.java | 85 ++ .../com/uber/cadence/workflow/LoggerTest.java | 3 +- .../uber/cadence/workflow/MetricsTest.java | 12 +- .../workflow/WorkflowMigrationTest.java | 4 +- .../cadence/workflow/WorkflowReplayTest.java | 102 +++ .../uber/cadence/workflow/WorkflowTest.java | 761 ++++-------------- 15 files changed, 930 insertions(+), 604 deletions(-) create mode 100644 src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java create mode 100644 src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java create mode 100644 src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java create mode 100644 src/test/java/com/uber/cadence/testUtils/RequiresTestService.java create mode 100644 src/test/java/com/uber/cadence/testUtils/TestEnvironment.java create mode 100644 src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java create mode 100644 src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java diff --git a/src/test/java/com/uber/cadence/RegisterTestDomain.java b/src/test/java/com/uber/cadence/RegisterTestDomain.java index 602a6a3a0..0b4be6f45 100644 --- a/src/test/java/com/uber/cadence/RegisterTestDomain.java +++ b/src/test/java/com/uber/cadence/RegisterTestDomain.java @@ -1,7 +1,7 @@ package com.uber.cadence; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN2; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.IWorkflowService; diff --git a/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java b/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java new file mode 100644 index 000000000..67f40c08c --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java @@ -0,0 +1,224 @@ +package com.uber.cadence.testUtils; + +import com.uber.cadence.FeatureFlags; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.internal.worker.PollerOptions; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactory; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.worker.WorkerOptions; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class CadenceTestContext { + + private final Map workers = new HashMap<>(); + private List> delayedCallbacks = new ArrayList<>(); + private final IWorkflowService wfService; + private final WorkflowClient workflowClient; + private final String defaultTaskList; + private final TracingWorkflowInterceptorFactory tracer; + private WorkerFactory workerFactory; + // Real Service only + private ScheduledExecutorService scheduledExecutor; + // Test service only + private TestWorkflowEnvironment testEnvironment; + + private CadenceTestContext( + WorkflowClient workflowClient, + String defaultTaskList, + TracingWorkflowInterceptorFactory tracer, + WorkerFactory workerFactory, + ScheduledExecutorService scheduledExecutor, + TestWorkflowEnvironment testEnvironment) { + this.wfService = workflowClient.getService(); + this.workflowClient = workflowClient; + this.defaultTaskList = defaultTaskList; + this.tracer = tracer; + this.workerFactory = workerFactory; + this.scheduledExecutor = scheduledExecutor; + this.testEnvironment = testEnvironment; + } + + public Worker getDefaultWorker() { + return getOrCreateWorker(getDefaultTaskList()); + } + + public Worker getOrCreateWorker(String taskList) { + return workers.computeIfAbsent(taskList, this::createWorker); + } + + public void start() { + if (isRealService()) { + workerFactory.start(); + } else { + testEnvironment.start(); + } + } + + public void stop() { + if (!workerFactory.isStarted() || workerFactory.isTerminated() || workerFactory.isShutdown()) { + return; + } + if (isRealService()) { + workerFactory.shutdown(); + workerFactory.awaitTermination(1, TimeUnit.SECONDS); + for (ScheduledFuture result : delayedCallbacks) { + if (result.isDone() && !result.isCancelled()) { + try { + result.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted, some assertions may not have run", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } else { + throw new RuntimeException("Failed to complete callback", e.getCause()); + } + } + } + } + wfService.close(); + } else { + testEnvironment.shutdown(); + testEnvironment.awaitTermination(1, TimeUnit.SECONDS); + } + if (tracer != null) { + tracer.assertExpected(); + } + } + + public void suspendPolling() { + workerFactory.suspendPolling(); + } + + public void resumePolling() { + workerFactory.resumePolling(); + } + + public void registerDelayedCallback(Duration delay, Runnable r) { + if (isRealService()) { + ScheduledFuture result = + scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS); + delayedCallbacks.add(result); + } else { + testEnvironment.registerDelayedCallback(delay, r); + } + } + + public void sleep(Duration d) { + if (isRealService()) { + try { + Thread.sleep(d.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } else { + testEnvironment.sleep(d); + } + } + + public long currentTimeMillis() { + if (isRealService()) { + return System.currentTimeMillis(); + } else { + return testEnvironment.currentTimeMillis(); + } + } + + public String getDefaultTaskList() { + return defaultTaskList; + } + + public WorkflowClient getWorkflowClient() { + return workflowClient; + } + + public WorkflowClient createWorkflowClient(WorkflowClientOptions options) { + if (isRealService()) { + return WorkflowClient.newInstance(getWorkflowClient().getService(), options); + } else { + return testEnvironment.newWorkflowClient(options); + } + } + + public TracingWorkflowInterceptorFactory getTracer() { + return tracer; + } + + private boolean isRealService() { + return testEnvironment == null; + } + + private Worker createWorker(String taskList) { + if (isRealService()) { + return workerFactory.newWorker( + taskList, + WorkerOptions.newBuilder() + .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) + .setMaxConcurrentActivityExecutionSize(1000) + .setInterceptorFactory(tracer) + .build()); + } else { + return testEnvironment.newWorker(taskList); + } + } + + public static CadenceTestContext forTestService( + Function envFactory, + WorkflowClientOptions clientOptions, + String defaultTaskList, + WorkerFactoryOptions workerFactoryOptions) { + TracingWorkflowInterceptorFactory tracer = new TracingWorkflowInterceptorFactory(); + + TestEnvironmentOptions testOptions = + new TestEnvironmentOptions.Builder() + .setWorkflowClientOptions(clientOptions) + .setInterceptorFactory(tracer) + .setWorkerFactoryOptions(workerFactoryOptions) + .build(); + TestWorkflowEnvironment testEnvironment = envFactory.apply(testOptions); + return new CadenceTestContext( + testEnvironment.newWorkflowClient(), + defaultTaskList, + tracer, + testEnvironment.getWorkerFactory(), + null, + testEnvironment); + } + + public static CadenceTestContext forRealService( + WorkflowClientOptions clientOptions, + String defaultTaskList, + WorkerFactoryOptions workerFactoryOptions) { + TracingWorkflowInterceptorFactory tracer = new TracingWorkflowInterceptorFactory(); + + IWorkflowService wfService = + new WorkflowServiceTChannel( + ClientOptions.newBuilder() + .setFeatureFlags( + new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) + .build()); + WorkflowClient workflowClient = WorkflowClient.newInstance(wfService, clientOptions); + WorkerFactory workerFactory = new WorkerFactory(workflowClient, workerFactoryOptions); + ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1); + return new CadenceTestContext( + workflowClient, defaultTaskList, tracer, workerFactory, scheduledExecutor, null); + } +} diff --git a/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java b/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java new file mode 100644 index 000000000..ad1bad0df --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java @@ -0,0 +1,289 @@ +package com.uber.cadence.testUtils; + +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.activity.LocalActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; +import org.junit.AssumptionViolatedException; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public class CadenceTestRule implements TestRule { + + private final Builder builder; + private CadenceTestContext context; + + private CadenceTestRule(Builder builder) { + this.builder = builder; + } + + @Override + public Statement apply(Statement testCase, Description description) { + // Unless the test overrides the timeout, apply our own + Test annotation = description.getAnnotation(Test.class); + if (TestEnvironment.isDebuggerTimeouts() || (annotation == null || annotation.timeout() > 0)) { + testCase = Timeout.millis(getDefaultTestTimeout().toMillis()).apply(testCase, description); + } + Statement finalStatement = testCase; + return new Statement() { + @Override + public void evaluate() throws Throwable { + if (description.getAnnotation(RequiresDockerService.class) != null && !isDockerService()) { + throw new AssumptionViolatedException( + "Skipping test because it requires the Docker service"); + } + if (description.getAnnotation(RequiresTestService.class) != null && isDockerService()) { + throw new AssumptionViolatedException( + "Skipping test because it requires the test service"); + } + setup(description); + try { + finalStatement.evaluate(); + } finally { + teardown(); + } + } + }; + } + + public TracingWorkflowInterceptorFactory getTracer() { + return context.getTracer(); + } + + public WorkflowClient getWorkflowClient() { + return context.getWorkflowClient(); + } + + public WorkflowClient createWorkflowClient(WorkflowClientOptions options) { + return context.createWorkflowClient(options); + } + + public Worker getWorker() { + return context.getDefaultWorker(); + } + + public Worker getWorker(String tasklist) { + return context.getOrCreateWorker(tasklist); + } + + public String getDefaultTaskList() { + return context.getDefaultTaskList(); + } + + public void start() { + context.start(); + } + + public void stop() { + context.stop(); + } + + public void suspendPolling() { + context.suspendPolling(); + } + + public void resumePolling() { + context.resumePolling(); + } + + public void registerDelayedCallback(Duration delay, Runnable r) { + context.registerDelayedCallback(delay, r); + } + + public void sleep(Duration d) { + context.sleep(d); + } + + public long currentTimeMillis() { + return context.currentTimeMillis(); + } + + public WorkflowOptions.Builder workflowOptionsBuilder() { + return workflowOptionsBuilder(context.getDefaultTaskList()); + } + + public WorkflowOptions.Builder workflowOptionsBuilder(String taskList) { + if (TestEnvironment.isDebuggerTimeouts()) { + return new WorkflowOptions.Builder() + .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) + .setTaskStartToCloseTimeout(Duration.ofSeconds(60)) + .setTaskList(taskList); + } else { + return new WorkflowOptions.Builder() + .setExecutionStartToCloseTimeout(Duration.ofSeconds(30)) + .setTaskStartToCloseTimeout(Duration.ofSeconds(5)) + .setTaskList(taskList); + } + } + + public ActivityOptions activityOptions() { + return activityOptions(context.getDefaultTaskList()); + } + + public ActivityOptions activityOptions(String taskList) { + if (TestEnvironment.isDebuggerTimeouts()) { + return new ActivityOptions.Builder() + .setTaskList(taskList) + .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) + .setHeartbeatTimeout(Duration.ofSeconds(1000)) + .setScheduleToStartTimeout(Duration.ofSeconds(1000)) + .setStartToCloseTimeout(Duration.ofSeconds(10000)) + .build(); + } else { + return new ActivityOptions.Builder() + .setTaskList(taskList) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build(); + } + } + + public LocalActivityOptions localActivityOptions() { + if (TestEnvironment.isDebuggerTimeouts()) { + return new LocalActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) + .build(); + } else { + return new LocalActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build(); + } + } + + private void setup(Description description) { + String testMethod = description.getMethodName(); + String defaultTaskList = + description.getClassName() + "-" + testMethod + "-" + UUID.randomUUID(); + + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder(builder.clientOptions).setDomain(builder.domain).build(); + + if (isDockerService()) { + this.context = + CadenceTestContext.forRealService( + clientOptions, defaultTaskList, builder.workerFactoryOptions); + } else { + this.context = + CadenceTestContext.forTestService( + builder.testWorkflowEnvironmentProvider, + clientOptions, + defaultTaskList, + builder.workerFactoryOptions); + } + + if (!builder.activities.isEmpty() || !builder.workflowTypes.isEmpty()) { + Worker defaultWorker = context.getDefaultWorker(); + if (!builder.activities.isEmpty()) { + defaultWorker.registerActivitiesImplementations(builder.activities.toArray()); + } + if (!builder.workflowTypes.isEmpty()) { + defaultWorker.registerWorkflowImplementationTypes( + builder.workflowTypes.stream().toArray(Class[]::new)); + } + } + if (builder.startWorkers) { + context.start(); + } + } + + private void teardown() { + context.stop(); + this.context = null; + } + + private Duration getDefaultTestTimeout() { + if (!builder.timeout.isZero()) { + return builder.timeout; + } + if (TestEnvironment.isDebuggerTimeouts()) { + return Duration.ofSeconds(500); + } + if (isDockerService()) { + return Duration.ofSeconds(30); + } + return Duration.ofSeconds(10); + } + + public boolean isDockerService() { + return TestEnvironment.isUseDockerService(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private List> workflowTypes = new ArrayList<>(); + private List activities = new ArrayList<>(); + private WorkerFactoryOptions workerFactoryOptions = WorkerFactoryOptions.newBuilder().build(); + private Function + testWorkflowEnvironmentProvider = TestWorkflowEnvironment::newInstance; + private WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().build(); + + private boolean startWorkers = false; + private Duration timeout = Duration.ZERO; + private String domain = TestEnvironment.DOMAIN; + + public Builder withWorkflowTypes(Class... workflowImpls) { + workflowTypes = Arrays.asList(workflowImpls); + return this; + } + + public Builder withActivities(Object... activities) { + this.activities = Arrays.asList(activities); + return this; + } + + public Builder withWorkerFactoryOptions(WorkerFactoryOptions options) { + this.workerFactoryOptions = options; + return this; + } + + public Builder withTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public Builder withDomain(String domain) { + this.domain = domain; + return this; + } + + public Builder withClientOptions(WorkflowClientOptions options) { + this.clientOptions = options; + return this; + } + + public Builder startWorkersAutomatically() { + startWorkers = true; + return this; + } + + public Builder withTestEnvironmentProvider( + Function testEnvironmentProvider) { + this.testWorkflowEnvironmentProvider = testEnvironmentProvider; + return this; + } + + public CadenceTestRule build() { + return new CadenceTestRule(this); + } + } +} diff --git a/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java b/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java new file mode 100644 index 000000000..49088e9fa --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java @@ -0,0 +1,10 @@ +package com.uber.cadence.testUtils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RequiresDockerService {} diff --git a/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java b/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java new file mode 100644 index 000000000..6a4580893 --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java @@ -0,0 +1,10 @@ +package com.uber.cadence.testUtils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RequiresTestService {} diff --git a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java new file mode 100644 index 000000000..ed20804f9 --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java @@ -0,0 +1,24 @@ +package com.uber.cadence.testUtils; + +public final class TestEnvironment { + public static final String DOMAIN = "UnitTest"; + public static final String DOMAIN2 = "UnitTest2"; + /** + * When set to true increases test, activity and workflow timeouts to large values to support + * stepping through code in a debugger without timing out. + */ + private static final boolean DEBUGGER_TIMEOUTS = false; + + private static final boolean USE_DOCKER_SERVICE = + Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); + + private TestEnvironment() {} + + public static boolean isDebuggerTimeouts() { + return DEBUGGER_TIMEOUTS; + } + + public static boolean isUseDockerService() { + return USE_DOCKER_SERVICE; + } +} diff --git a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java index 936370cd1..a71e27d7d 100644 --- a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java +++ b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index afc54a7a6..eaa79218f 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 30508fbda..0a0e801f2 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertNotNull; import com.uber.cadence.activity.ActivityMethod; diff --git a/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java b/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java new file mode 100644 index 000000000..ed60d3891 --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java @@ -0,0 +1,85 @@ +package com.uber.cadence.workflow; + +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; +import static org.junit.Assert.assertEquals; + +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal; +import com.uber.cadence.testUtils.CadenceTestRule; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Rule; +import org.junit.Test; + +public class CrossDomainWorkflowTest { + + // When running against the test service we need both rules to share the same one rather than each + // creating their own. + private final TestWorkflowEnvironmentInternal.WorkflowServiceWrapper testWorkflowService = + new TestWorkflowEnvironmentInternal.WorkflowServiceWrapper(); + + @Rule + public CadenceTestRule firstDomain = + CadenceTestRule.builder() + .withWorkflowTypes(TestWorkflowCrossDomainImpl.class) + .startWorkersAutomatically() + .withTestEnvironmentProvider( + options -> TestWorkflowEnvironment.newInstance(testWorkflowService, options)) + .build(); + + @Rule + public CadenceTestRule secondDomain = + CadenceTestRule.builder() + .withDomain(DOMAIN2) + .withWorkflowTypes(WorkflowTest.TestWorkflowSignaledSimple.class) + .startWorkersAutomatically() + .withTestEnvironmentProvider( + options -> TestWorkflowEnvironment.newInstance(testWorkflowService, options)) + .build(); + + public interface TestWorkflowCrossDomain { + + @WorkflowMethod + String execute(String workflowId); + } + + public static class TestWorkflowCrossDomainImpl implements TestWorkflowCrossDomain { + + @Override + @WorkflowMethod + public String execute(String wfId) { + ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(wfId); + SignalOptions options = + SignalOptions.newBuilder().setDomain(DOMAIN2).setSignalName("testSignal").build(); + externalWorkflow.signal(options, "World"); + return "Signaled External workflow"; + } + } + + @Test + public void testSignalCrossDomainExternalWorkflow() + throws ExecutionException, InterruptedException { + + WorkflowOptions.Builder options = firstDomain.workflowOptionsBuilder(); + + String wfId = UUID.randomUUID().toString(); + WorkflowOptions.Builder options2 = secondDomain.workflowOptionsBuilder().setWorkflowId(wfId); + + TestWorkflowCrossDomain wf = + firstDomain + .getWorkflowClient() + .newWorkflowStub(TestWorkflowCrossDomain.class, options.build()); + + WorkflowTest.TestWorkflowSignaled simpleWorkflow = + secondDomain + .getWorkflowClient() + .newWorkflowStub(WorkflowTest.TestWorkflowSignaled.class, options2.build()); + + CompletableFuture result = WorkflowClient.execute(simpleWorkflow::execute); + assertEquals("Signaled External workflow", wf.execute(wfId)); + assertEquals("Simple workflow signaled", result.get()); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/LoggerTest.java b/src/test/java/com/uber/cadence/workflow/LoggerTest.java index 2173042f5..5e6d939a7 100644 --- a/src/test/java/com/uber/cadence/workflow/LoggerTest.java +++ b/src/test/java/com/uber/cadence/workflow/LoggerTest.java @@ -27,6 +27,7 @@ import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.internal.logging.LoggerTag; +import com.uber.cadence.testUtils.TestEnvironment; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.worker.Worker; @@ -90,7 +91,7 @@ public void executeChild(String id) { @Test public void testWorkflowLogger() { WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(WorkflowTest.DOMAIN).build(); + WorkflowClientOptions.newBuilder().setDomain(TestEnvironment.DOMAIN).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder() .setWorkflowClientOptions(clientOptions) diff --git a/src/test/java/com/uber/cadence/workflow/MetricsTest.java b/src/test/java/com/uber/cadence/workflow/MetricsTest.java index 21417847a..4b84a0919 100644 --- a/src/test/java/com/uber/cadence/workflow/MetricsTest.java +++ b/src/test/java/com/uber/cadence/workflow/MetricsTest.java @@ -17,6 +17,7 @@ package com.uber.cadence.workflow; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -197,10 +198,7 @@ public void setUp(com.uber.m3.util.Duration reportingFrequecy) { Scope scope = new RootScopeBuilder().reporter(reporter).reportEvery(reportingFrequecy); WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder() - .setDomain(WorkflowTest.DOMAIN) - .setMetricsScope(scope) - .build(); + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).setMetricsScope(scope).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); @@ -229,7 +227,7 @@ public void testWorkflowMetrics() throws InterruptedException { Map tags = new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .build(); @@ -252,7 +250,7 @@ public void testWorkflowMetrics() throws InterruptedException { Map activityCompletionTags = new ImmutableMap.Builder(3) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .put(MetricsTag.ACTIVITY_TYPE, "TestActivity::runActivity") .put(MetricsTag.WORKFLOW_TYPE, "TestWorkflow::execute") @@ -293,7 +291,7 @@ public void testCorruptedSignalMetrics() throws InterruptedException { Map tags = new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .build(); verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 1); diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java index 4c15a7ed8..6bd3a633c 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java @@ -17,8 +17,8 @@ package com.uber.cadence.workflow; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN2; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; import static junit.framework.TestCase.fail; import com.uber.cadence.*; diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java new file mode 100644 index 000000000..3406c075b --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java @@ -0,0 +1,102 @@ +package com.uber.cadence.workflow; + +import com.uber.cadence.testing.WorkflowReplayer; +import org.junit.Ignore; +import org.junit.Test; + +public class WorkflowReplayTest { + + // Server doesn't guarantee that the timer fire timestamp is larger or equal of the + // expected fire time. This test ensures that client still fires timer in this case. + @Test + public void testTimerFiringTimestampEarlierThanExpected() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "timerfiring.json", WorkflowTest.TimerFiringWorkflowImpl.class); + } + + @Test + public void testWorkflowReset() throws Exception { + // Leave the following code to generate history. + // startWorkerFor(TestWorkflowResetReplayWorkflow.class, TestMultiargsWorkflowsImpl.class); + // TestWorkflow1 workflowStub = + // workflowClient.newWorkflowStub( + // TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + // workflowStub.execute(taskList); + // + // try { + // Thread.sleep(60000000); + // } catch (InterruptedException e) { + // e.printStackTrace(); + // } + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "resetWorkflowHistory.json", WorkflowTest.TestWorkflowResetReplayWorkflow.class); + } + + @Test + public void testGetVersionWithRetryReplay() throws Exception { + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionWithRetryHistory.json", WorkflowTest.TestGetVersionWorkflowRetryImpl.class); + } + + @Test + public void testGetVersionRemoveAndAdd() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionRemoveAndAddImpl.class); + } + + /** + * Tests that history that was created before server side retry was supported is backwards + * compatible with the client that supports the server side retry. + */ + @Test + public void testAsyncActivityRetryReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testAsyncActivityRetryHistory.json", WorkflowTest.TestAsyncActivityRetry.class); + } + + /** + * Tests that history created before marker header change is backwards compatible with old markers + * generated without headers. + */ + @Test + // This test previously had a check for the incorrect test name and never ran. The json doesn't + // parse. + // Keeping it around in case we decide to fix it. + @Ignore + public void testMutableSideEffectReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testMutableSideEffectBackwardCompatibility.json", + WorkflowTest.TestMutableSideEffectWorkflowImpl.class); + } + + @Test + public void testGetVersionRemoved() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionRemovedImpl.class); + } + + @Test + public void testGetVersionAdded() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionAddedImpl.class); + } + + @Test + public void testGetVersionAddedWithCadenceChangeVersion() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistoryWithCadenceChangeVersion.json", + WorkflowTest.TestGetVersionAddedImpl.class); + } + + /** + * Tests that history that was created before server side retry was supported is backwards + * compatible with the client that supports the server side retry. + */ + @Test + public void testChildWorkflowRetryReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testChildWorkflowRetryHistory.json", WorkflowTest.TestChildWorkflowRetryWorkflow.class); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 84e3c10ca..36576bb89 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -17,6 +17,7 @@ package com.uber.cadence.workflow; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,7 +37,6 @@ import com.uber.cadence.DomainAlreadyExistsError; import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; -import com.uber.cadence.FeatureFlags; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.HistoryEvent; import com.uber.cadence.Memo; @@ -63,14 +63,10 @@ import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.sync.DeterministicRunnerTest; -import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal; -import com.uber.cadence.internal.worker.PollerOptions; -import com.uber.cadence.serviceclient.ClientOptions; -import com.uber.cadence.serviceclient.IWorkflowService; -import com.uber.cadence.serviceclient.WorkflowServiceTChannel; -import com.uber.cadence.testing.TestEnvironmentOptions; -import com.uber.cadence.testing.TestWorkflowEnvironment; -import com.uber.cadence.testing.WorkflowReplayer; +import com.uber.cadence.testUtils.CadenceTestRule; +import com.uber.cadence.testUtils.RequiresDockerService; +import com.uber.cadence.testUtils.RequiresTestService; +import com.uber.cadence.testUtils.TestEnvironment; import com.uber.cadence.worker.*; import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; import java.io.File; @@ -103,16 +99,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -120,12 +112,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.rules.TestWatcher; -import org.junit.rules.Timeout; -import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,93 +121,46 @@ @RunWith(Parameterized.class) public class WorkflowTest { - /** - * When set to true increases test, activity and workflow timeouts to large values to support - * stepping through code in a debugger without timing out. - */ - private static final boolean DEBUGGER_TIMEOUTS = false; + private static final Logger log = LoggerFactory.getLogger(WorkflowTest.class); private static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]"; - - private TracingWorkflowInterceptorFactory tracer; - private static final boolean useDockerService = - Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); - - private TestWorkflowEnvironmentInternal.WorkflowServiceWrapper wfService; - + private static final String UUID_REGEXP = + "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; private static final boolean stickyOff = Boolean.parseBoolean(System.getenv("STICKY_OFF")); - @Parameters(name = "{1}") + @Rule public final CadenceTestRule cadenceTestRule; + @Rule public final TestName testName = new TestName(); + + @Parameters(name = "{0}") public static Object[] data() { - if (!useDockerService) { + if (TestEnvironment.isUseDockerService()) { return new Object[][] { - {false, "TestService Sticky OFF", true}, {false, "TestService Sticky ON", false} + {"Docker Sticky " + (stickyOff ? "OFF" : "ON"), stickyOff}, }; } else { - return new Object[][] { - {true, "Docker Sticky " + (stickyOff ? "OFF" : "ON"), stickyOff}, - }; + return new Object[][] {{"TestService Sticky OFF", true}, {"TestService Sticky ON", false}}; } } - @Rule public TestName testName = new TestName(); - - @Rule - public Timeout globalTimeout = - Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : !useDockerService ? 15 : 30); - - @Rule - public TestWatcher watchman = - new TestWatcher() { - @Override - protected void failed(Throwable e, Description description) { - if (tracer != null) { - System.err.println("TRACE:\n" + tracer.getTrace()); - } - if (testEnvironment != null) { - System.err.println("HISTORIES:\n" + testEnvironment.getDiagnostics()); - } - } - }; - - @Parameter public boolean useExternalService; - - @Parameter(1) - public String testType; - - @Parameter(2) - public boolean disableStickyExecution; - - public static final String DOMAIN = "UnitTest"; - public static final String DOMAIN2 = "UnitTest2"; - private static final Logger log = LoggerFactory.getLogger(WorkflowTest.class); - - private static String UUID_REGEXP = - "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; - + private final boolean disableStickyExecution; private String taskList; - - private WorkerFactory workerFactory; - private Worker worker; private TestActivitiesImpl activitiesImpl; private WorkflowClient workflowClient; - private TestWorkflowEnvironment testEnvironment; - private ScheduledExecutorService scheduledExecutor; - private List> delayedCallbacks = new ArrayList<>(); - private static final IWorkflowService service = - new WorkflowServiceTChannel( - ClientOptions.newBuilder() - .setFeatureFlags( - new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) - .build()); + private TracingWorkflowInterceptorFactory tracer; - @AfterClass - public static void closeService() { - service.close(); + public WorkflowTest(String ignored, boolean disableStickyExecution) { + this.disableStickyExecution = disableStickyExecution; + this.cadenceTestRule = + CadenceTestRule.builder() + .withWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setDisableStickyExecution(disableStickyExecution) + .build()) + .build(); } private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList) { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new WorkflowOptions.Builder() .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) .setTaskStartToCloseTimeout(Duration.ofSeconds(60)) @@ -233,7 +174,7 @@ private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList } private static ActivityOptions newActivityOptions1(String taskList) { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new ActivityOptions.Builder() .setTaskList(taskList) .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) @@ -253,7 +194,7 @@ private static ActivityOptions newActivityOptions1(String taskList) { } private static LocalActivityOptions newLocalActivityOptions1() { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new LocalActivityOptions.Builder() .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) .build(); @@ -270,54 +211,19 @@ private static ActivityOptions newActivityOptions2() { @Before public void setUp() { - this.wfService = new TestWorkflowEnvironmentInternal.WorkflowServiceWrapper(); String testMethod = testName.getMethodName(); if (testMethod.startsWith("testExecute") || testMethod.startsWith("testStart")) { taskList = ANNOTATION_TASK_LIST; } else { - taskList = "WorkflowTest-" + testMethod + "-" + UUID.randomUUID().toString(); - } - tracer = new TracingWorkflowInterceptorFactory(); - // TODO: Create a version of TestWorkflowEnvironment that runs against a real service. - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); - if (useExternalService) { - workflowClient = WorkflowClient.newInstance(service, clientOptions); - WorkerFactoryOptions factoryOptions = - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build(); - workerFactory = new WorkerFactory(workflowClient, factoryOptions); - WorkerOptions workerOptions = - WorkerOptions.newBuilder() - .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) - .setMaxConcurrentActivityExecutionSize(1000) - .setInterceptorFactory(tracer) - .build(); - worker = workerFactory.newWorker(taskList, workerOptions); - scheduledExecutor = new ScheduledThreadPoolExecutor(1); - } else { - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions(clientOptions) - .setInterceptorFactory(tracer) - .setWorkerFactoryOptions( - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build()) - .build(); - testEnvironment = TestWorkflowEnvironment.newInstance(wfService, testOptions); - worker = testEnvironment.newWorker(taskList); - workflowClient = testEnvironment.newWorkflowClient(); + taskList = cadenceTestRule.getDefaultTaskList(); } + workflowClient = cadenceTestRule.getWorkflowClient(); + tracer = cadenceTestRule.getTracer(); ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient(); activitiesImpl = new TestActivitiesImpl(completionClient); - worker.registerActivitiesImplementations(activitiesImpl); - - newWorkflowOptionsBuilder(taskList); + cadenceTestRule.getWorker(taskList).registerActivitiesImplementations(activitiesImpl); - newActivityOptions1(taskList); activitiesImpl.invocations.clear(); activitiesImpl.procResult.clear(); } @@ -327,72 +233,33 @@ public void tearDown() throws Throwable { if (activitiesImpl != null) { activitiesImpl.close(); } - if (testEnvironment != null) { - testEnvironment.close(); - } - for (ScheduledFuture result : delayedCallbacks) { - if (result.isDone() && !result.isCancelled()) { - try { - result.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - throw e.getCause(); - } - } - } - if (tracer != null) { - tracer.assertExpected(); - } + cadenceTestRule.stop(); } private void startWorkerFor(Class... workflowTypes) { + Worker worker = cadenceTestRule.getWorker(taskList); worker.registerWorkflowImplementationTypes(workflowTypes); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + cadenceTestRule.start(); } private void startWorkerFor(WorkflowImplementationOptions options, Class... workflowTypes) { + Worker worker = cadenceTestRule.getWorker(taskList); worker.registerWorkflowImplementationTypes(options, workflowTypes); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + cadenceTestRule.start(); } // TODO: Refactor testEnvironment to support testing through real service to avoid this // conditional switches void registerDelayedCallback(Duration delay, Runnable r) { - if (useExternalService) { - ScheduledFuture result = - scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS); - delayedCallbacks.add(result); - } else { - testEnvironment.registerDelayedCallback(delay, r); - } + cadenceTestRule.registerDelayedCallback(delay, r); } void sleep(Duration d) { - if (useExternalService) { - try { - Thread.sleep(d.toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } else { - testEnvironment.sleep(d); - } + cadenceTestRule.sleep(d); } long currentTimeMillis() { - if (useExternalService) { - return System.currentTimeMillis(); - } else { - return testEnvironment.currentTimeMillis(); - } + return cadenceTestRule.currentTimeMillis(); } public interface TestWorkflow1 { @@ -401,12 +268,6 @@ public interface TestWorkflow1 { String execute(String taskList); } - public interface TestWorkflowCrossDomain { - - @WorkflowMethod - String execute(String workflowId); - } - public interface TestWorkflowSignaled { @WorkflowMethod @@ -884,34 +745,6 @@ public void testAsyncActivityRetry() { assertEquals(activitiesImpl.toString(), 3, activitiesImpl.invocations.size()); } - /** - * Tests that history that was created before server side retry was supported is backwards - * compatible with the client that supports the server side retry. - */ - @Test - public void testAsyncActivityRetryReplay() throws Exception { - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testAsyncActivityRetryHistory.json", TestAsyncActivityRetry.class); - } - - /** - * Tests that history created before marker header change is backwards compatible with old markers - * generated without headers. - */ - @Test - public void testMutableSideEffectReplay() throws Exception { - // Avoid executing 4 times - if (!testName.getMethodName().equals("testAsyncActivityRetryReplay[Docker Sticky OFF]")) { - return; - } - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testMutableSideEffectBackwardCompatibility.json", TestMutableSideEffectWorkflowImpl.class); - } - public static class TestAsyncActivityRetryOptionsChange implements TestWorkflow1 { private TestActivities activities; @@ -1260,11 +1093,7 @@ public int execute(int count, String continueAsNewTaskList) { public void testContinueAsNew() { Worker w2; String continuedTaskList = this.taskList + "_continued"; - if (useExternalService) { - w2 = workerFactory.newWorker(continuedTaskList); - } else { - w2 = testEnvironment.newWorker(continuedTaskList); - } + w2 = cadenceTestRule.getWorker(continuedTaskList); w2.registerWorkflowImplementationTypes(TestContinueAsNewImpl.class); startWorkerFor(TestContinueAsNewImpl.class); @@ -1492,7 +1321,7 @@ public void testStart() { TestMultiargsWorkflowsFunc1 stubF1 = workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc1.class); - if (!useExternalService) { + if (!TestEnvironment.isUseDockerService()) { // Use worker that polls on a task list configured through @WorkflowMethod annotation of func1 assertResult(1, WorkflowClient.start(stubF1::func1, 1)); assertEquals(1, stubF1.func1(1)); // Check that duplicated start just returns the result. @@ -1566,97 +1395,92 @@ public void testStart() { } @Test + @RequiresTestService public void testMemo() { - if (testEnvironment != null) { - String testMemoKey = "testKey"; - String testMemoValue = "testValue"; - Map memo = new HashMap<>(); - memo.put(testMemoKey, testMemoValue); - - startWorkerFor(TestMultiargsWorkflowsImpl.class); - WorkflowOptions workflowOptions = newWorkflowOptionsBuilder(taskList).setMemo(memo).build(); - TestMultiargsWorkflowsFunc stubF = - workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); - WorkflowExecution executionF = WorkflowClient.start(stubF::func); + String testMemoKey = "testKey"; + String testMemoValue = "testValue"; + Map memo = new HashMap<>(); + memo.put(testMemoKey, testMemoValue); - GetWorkflowExecutionHistoryResponse historyResp = - WorkflowExecutionUtils.getHistoryPage( - new byte[] {}, testEnvironment.getWorkflowService(), DOMAIN, executionF); - HistoryEvent startEvent = historyResp.history.getEvents().get(0); - Memo memoFromEvent = startEvent.workflowExecutionStartedEventAttributes.getMemo(); - byte[] memoBytes = memoFromEvent.getFields().get(testMemoKey).array(); - String memoRetrieved = - JsonDataConverter.getInstance().fromData(memoBytes, String.class, String.class); - assertEquals(testMemoValue, memoRetrieved); - } + startWorkerFor(TestMultiargsWorkflowsImpl.class); + WorkflowOptions workflowOptions = newWorkflowOptionsBuilder(taskList).setMemo(memo).build(); + TestMultiargsWorkflowsFunc stubF = + workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); + WorkflowExecution executionF = WorkflowClient.start(stubF::func); + + GetWorkflowExecutionHistoryResponse historyResp = + WorkflowExecutionUtils.getHistoryPage( + new byte[] {}, workflowClient.getService(), DOMAIN, executionF); + HistoryEvent startEvent = historyResp.history.getEvents().get(0); + Memo memoFromEvent = startEvent.workflowExecutionStartedEventAttributes.getMemo(); + byte[] memoBytes = memoFromEvent.getFields().get(testMemoKey).array(); + String memoRetrieved = + JsonDataConverter.getInstance().fromData(memoBytes, String.class, String.class); + assertEquals(testMemoValue, memoRetrieved); } @Test + @RequiresTestService public void testSearchAttributes() { - if (testEnvironment != null) { - String testKeyString = "CustomKeywordField"; - String testValueString = "testKeyword"; - String testKeyInteger = "CustomIntField"; - Integer testValueInteger = 1; - String testKeyDateTime = "CustomDateTimeField"; - LocalDateTime testValueDateTime = LocalDateTime.now(); - String testKeyBool = "CustomBoolField"; - Boolean testValueBool = true; - String testKeyDouble = "CustomDoubleField"; - Double testValueDouble = 1.23; - - // add more type to test - Map searchAttr = new HashMap<>(); - searchAttr.put(testKeyString, testValueString); - searchAttr.put(testKeyInteger, testValueInteger); - searchAttr.put(testKeyDateTime, testValueDateTime); - searchAttr.put(testKeyBool, testValueBool); - searchAttr.put(testKeyDouble, testValueDouble); - - startWorkerFor(TestMultiargsWorkflowsImpl.class); - WorkflowOptions workflowOptions = - newWorkflowOptionsBuilder(taskList).setSearchAttributes(searchAttr).build(); - TestMultiargsWorkflowsFunc stubF = - workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); - WorkflowExecution executionF = WorkflowClient.start(stubF::func); - - GetWorkflowExecutionHistoryResponse historyResp = - WorkflowExecutionUtils.getHistoryPage( - new byte[] {}, testEnvironment.getWorkflowService(), DOMAIN, executionF); - HistoryEvent startEvent = historyResp.history.getEvents().get(0); - SearchAttributes searchAttrFromEvent = - startEvent.workflowExecutionStartedEventAttributes.getSearchAttributes(); - - byte[] searchAttrStringBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyString).array(); - String retrievedString = - JsonDataConverter.getInstance() - .fromData(searchAttrStringBytes, String.class, String.class); - assertEquals(testValueString, retrievedString); - byte[] searchAttrIntegerBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyInteger).array(); - Integer retrievedInteger = - JsonDataConverter.getInstance() - .fromData(searchAttrIntegerBytes, Integer.class, Integer.class); - assertEquals(testValueInteger, retrievedInteger); - byte[] searchAttrDateTimeBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyDateTime).array(); - LocalDateTime retrievedDateTime = - JsonDataConverter.getInstance() - .fromData(searchAttrDateTimeBytes, LocalDateTime.class, LocalDateTime.class); - assertEquals(testValueDateTime, retrievedDateTime); - byte[] searchAttrBoolBytes = searchAttrFromEvent.getIndexedFields().get(testKeyBool).array(); - Boolean retrievedBool = - JsonDataConverter.getInstance() - .fromData(searchAttrBoolBytes, Boolean.class, Boolean.class); - assertEquals(testValueBool, retrievedBool); - byte[] searchAttrDoubleBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyDouble).array(); - Double retrievedDouble = - JsonDataConverter.getInstance() - .fromData(searchAttrDoubleBytes, Double.class, Double.class); - assertEquals(testValueDouble, retrievedDouble); - } + String testKeyString = "CustomKeywordField"; + String testValueString = "testKeyword"; + String testKeyInteger = "CustomIntField"; + Integer testValueInteger = 1; + String testKeyDateTime = "CustomDateTimeField"; + LocalDateTime testValueDateTime = LocalDateTime.now(); + String testKeyBool = "CustomBoolField"; + Boolean testValueBool = true; + String testKeyDouble = "CustomDoubleField"; + Double testValueDouble = 1.23; + + // add more type to test + Map searchAttr = new HashMap<>(); + searchAttr.put(testKeyString, testValueString); + searchAttr.put(testKeyInteger, testValueInteger); + searchAttr.put(testKeyDateTime, testValueDateTime); + searchAttr.put(testKeyBool, testValueBool); + searchAttr.put(testKeyDouble, testValueDouble); + + startWorkerFor(TestMultiargsWorkflowsImpl.class); + WorkflowOptions workflowOptions = + newWorkflowOptionsBuilder(taskList).setSearchAttributes(searchAttr).build(); + TestMultiargsWorkflowsFunc stubF = + workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); + WorkflowExecution executionF = WorkflowClient.start(stubF::func); + + GetWorkflowExecutionHistoryResponse historyResp = + WorkflowExecutionUtils.getHistoryPage( + new byte[] {}, workflowClient.getService(), DOMAIN, executionF); + HistoryEvent startEvent = historyResp.history.getEvents().get(0); + SearchAttributes searchAttrFromEvent = + startEvent.workflowExecutionStartedEventAttributes.getSearchAttributes(); + + byte[] searchAttrStringBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyString).array(); + String retrievedString = + JsonDataConverter.getInstance().fromData(searchAttrStringBytes, String.class, String.class); + assertEquals(testValueString, retrievedString); + byte[] searchAttrIntegerBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyInteger).array(); + Integer retrievedInteger = + JsonDataConverter.getInstance() + .fromData(searchAttrIntegerBytes, Integer.class, Integer.class); + assertEquals(testValueInteger, retrievedInteger); + byte[] searchAttrDateTimeBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyDateTime).array(); + LocalDateTime retrievedDateTime = + JsonDataConverter.getInstance() + .fromData(searchAttrDateTimeBytes, LocalDateTime.class, LocalDateTime.class); + assertEquals(testValueDateTime, retrievedDateTime); + byte[] searchAttrBoolBytes = searchAttrFromEvent.getIndexedFields().get(testKeyBool).array(); + Boolean retrievedBool = + JsonDataConverter.getInstance().fromData(searchAttrBoolBytes, Boolean.class, Boolean.class); + assertEquals(testValueBool, retrievedBool); + byte[] searchAttrDoubleBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyDouble).array(); + Double retrievedDouble = + JsonDataConverter.getInstance().fromData(searchAttrDoubleBytes, Double.class, Double.class); + assertEquals(testValueDouble, retrievedDouble); } @Test @@ -2094,10 +1918,7 @@ public String execute(String taskList) { public void testUntypedChildStubWorkflowAsyncInvoke() { startWorkerFor(TestUntypedChildStubWorkflowAsyncInvoke.class, TestMultiargsWorkflowsImpl.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(60)); - options.setTaskList(taskList); + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); TestWorkflow1 client = workflowClient.newWorkflowStub(TestWorkflow1.class, options.build()); assertEquals(null, client.execute(taskList)); } @@ -2149,7 +1970,7 @@ public List getTrace() { public void testTimer() { startWorkerFor(TestTimerWorkflowImpl.class); WorkflowOptions options; - if (useExternalService) { + if (cadenceTestRule.isDockerService()) { options = newWorkflowOptionsBuilder(taskList).build(); } else { options = @@ -2158,9 +1979,9 @@ public void testTimer() { .build(); } TestWorkflow2 client = workflowClient.newWorkflowStub(TestWorkflow2.class, options); - String result = client.execute(useExternalService); + String result = client.execute(cadenceTestRule.isDockerService()); assertEquals("testTimer", result); - if (useExternalService) { + if (cadenceTestRule.isDockerService()) { tracer.setExpected( "executeWorkflow: testActivity", "registerQuery getTrace", @@ -2221,7 +2042,7 @@ public void testAsyncRetry() { TestWorkflow2.class, newWorkflowOptionsBuilder(taskList).build()); String result = null; try { - result = client.execute(useExternalService); + result = client.execute(cadenceTestRule.isDockerService()); fail("unreachable"); } catch (WorkflowException e) { assertTrue(e.getCause() instanceof IllegalThreadStateException); @@ -2288,7 +2109,7 @@ public void testAsyncRetryOptionsChange() { TestWorkflow2.class, newWorkflowOptionsBuilder(taskList).build()); String result = null; try { - result = client.execute(useExternalService); + result = client.execute(cadenceTestRule.isDockerService()); fail("unreachable"); } catch (WorkflowException e) { assertTrue(e.getCause() instanceof IllegalThreadStateException); @@ -2464,15 +2285,6 @@ public void mySignal(String value) { @Test public void testSignal() { - // Test getTrace through replay by a local worker. - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSignalWorkflowImpl.class); startWorkerFor(TestSignalWorkflowImpl.class); WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); String workflowId = UUID.randomUUID().toString(); @@ -2529,14 +2341,6 @@ public void mySignal(String value) { @Test public void testSignalingCompletedWorkflow() { - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSimpleWorkflowImpl.class); startWorkerFor(TestSimpleWorkflowImpl.class); String workflowId = UUID.randomUUID().toString(); @@ -2603,15 +2407,6 @@ public void mySignal(String value) { @Test public void testSignalWithStart() { - // Test getTrace through replay by a local worker. - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSignalWithStartWorkflowImpl.class); startWorkerFor(TestSignalWorkflowImpl.class); WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); String workflowId = UUID.randomUUID().toString(); @@ -2730,7 +2525,7 @@ public void testNoQueryThreadLeak() throws InterruptedException { int queryCount = 100; for (int i = 0; i < queryCount; i++) { assertEquals("some state", client.getState()); - if (useDockerService) { + if (cadenceTestRule.isDockerService()) { // Sleep a little bit to avoid server throttling error. Thread.sleep(50); } @@ -2759,19 +2554,10 @@ public void testQueryRejectionConditionDefault() { @Test public void testQueryRejectionConditionNotOpen() { startWorkerFor(TestNoQueryWorkflowImpl.class); - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder(workflowClient.getOptions()) - .setQueryRejectCondition(QueryRejectCondition.NOT_OPEN) - .build(); - WorkflowClient wc; - if (useExternalService) { - wc = WorkflowClient.newInstance(service, clientOptions); - } else { - wc = testEnvironment.newWorkflowClient(clientOptions); - } WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); - QueryableWorkflow client = wc.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); + QueryableWorkflow client = + workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); WorkflowClient.start(client::execute); sleep(Duration.ofSeconds(1)); assertEquals("some state", client.getState()); @@ -2801,10 +2587,14 @@ public void testSignalUntyped() { assertEquals("initial", client.query("QueryableWorkflow::getState", String.class)); client.signal("testSignal", "Hello "); sleep(Duration.ofMillis(500)); - while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) {} + while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) { + sleep(Duration.ofMillis(10)); + } assertEquals("Hello ", client.query("QueryableWorkflow::getState", String.class)); client.signal("testSignal", "World!"); - while (!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) {} + while (!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) { + sleep(Duration.ofMillis(10)); + } assertEquals("World!", client.query("QueryableWorkflow::getState", String.class)); assertEquals( "Hello World!", @@ -3195,7 +2985,7 @@ public String execute(String taskList, int delay) { @Test public void testChildWorkflowRetry() { AngryChildActivityImpl angryChildActivity = new AngryChildActivityImpl(); - worker.registerActivitiesImplementations(angryChildActivity); + cadenceTestRule.getWorker().registerActivitiesImplementations(angryChildActivity); startWorkerFor(TestChildWorkflowRetryWorkflow.class, AngryChild.class); WorkflowOptions.Builder options = new WorkflowOptions.Builder(); @@ -3216,12 +3006,7 @@ public WorkflowStub newUntypedWorkflowStub( }) .setDomain(DOMAIN) .build(); - WorkflowClient wc; - if (useExternalService) { - wc = WorkflowClient.newInstance(service, clientOptions); - } else { - wc = testEnvironment.newWorkflowClient(clientOptions); - } + WorkflowClient wc = cadenceTestRule.createWorkflowClient(clientOptions); TestWorkflow1 client = wc.newWorkflowStub(TestWorkflow1.class, options.build()); try { @@ -3236,19 +3021,6 @@ public WorkflowStub newUntypedWorkflowStub( assertEquals(3, angryChildActivity.getInvocationCount()); } - /** - * Tests that history that was created before server side retry was supported is backwards - * compatible with the client that supports the server side retry. - */ - @Test - public void testChildWorkflowRetryReplay() throws Exception { - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testChildWorkflowRetryHistory.json", TestChildWorkflowRetryWorkflow.class); - } - public static class TestChildWorkflowExecutionPromiseHandler implements TestWorkflow1 { private ITestNamedChild child; @@ -3275,18 +3047,10 @@ public String execute(String taskList) { public void testChildWorkflowExecutionPromiseHandler() { startWorkerFor(TestChildWorkflowExecutionPromiseHandler.class, TestNamedChild.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(20)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(2)); - options.setTaskList(taskList); - WorkflowClient wc; - if (useExternalService) { - wc = workflowClient; - } else { - wc = testEnvironment.newWorkflowClient(); - } + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); - TestWorkflow1 client = wc.newWorkflowStub(TestWorkflow1.class, options.build()); + TestWorkflow1 client = + cadenceTestRule.getWorkflowClient().newWorkflowStub(TestWorkflow1.class, options.build()); String result = client.execute(taskList); assertEquals("FOO", result); } @@ -3331,10 +3095,8 @@ public String execute(String greeting, String parentWorkflowID) { @Test public void testSignalExternalWorkflow() { startWorkerFor(TestSignalExternalWorkflow.class, SignalingChildImpl.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(2000)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(60)); - options.setTaskList(taskList); + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); + TestWorkflowSignaled client = workflowClient.newWorkflowStub(TestWorkflowSignaled.class, options.build()); assertEquals("Hello World!", client.execute()); @@ -3365,19 +3127,6 @@ public void signal1(String arg) { } } - public static class TestWorkflowCrossDomainImpl implements TestWorkflowCrossDomain { - - @Override - @WorkflowMethod - public String execute(String wfId) { - ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(wfId); - SignalOptions options = - SignalOptions.newBuilder().setDomain(DOMAIN2).setSignalName("testSignal").build(); - externalWorkflow.signal(options, "World"); - return "Signaled External workflow"; - } - } - public static class UntypedSignalingChildImpl implements SignalingChild { @Override @@ -3400,67 +3149,6 @@ public void testUntypedSignalExternalWorkflow() { assertEquals("Hello World!", client.execute()); } - @Test - public void testSignalCrossDomainExternalWorkflow() - throws ExecutionException, InterruptedException { - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(DOMAIN2).build(); - - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions(clientOptions) - .setInterceptorFactory(tracer) - .setWorkerFactoryOptions( - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build()) - .build(); - - startWorkerFor(TestWorkflowCrossDomainImpl.class); - - WorkflowClient workflowClient2; - if (useExternalService) { - workflowClient2 = WorkflowClient.newInstance(service, clientOptions); - WorkerFactoryOptions factoryOptions = - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build(); - WorkerFactory workerFactory2 = new WorkerFactory(workflowClient2, factoryOptions); - Worker worker2 = workerFactory2.newWorker(taskList + "2"); - worker2.registerWorkflowImplementationTypes(TestWorkflowSignaledSimple.class); - workerFactory2.start(); - } else { - TestWorkflowEnvironment testEnvironment2 = - TestWorkflowEnvironment.newInstance(wfService, testOptions); - Worker worker2 = testEnvironment2.newWorker(taskList + "2"); - workflowClient2 = testEnvironment2.newWorkflowClient(); - worker2.registerWorkflowImplementationTypes(TestWorkflowSignaledSimple.class); - testEnvironment2.start(); - } - - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(30)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(30)); - options.setTaskList(taskList); - - WorkflowOptions.Builder options2 = new WorkflowOptions.Builder(); - String wfId = UUID.randomUUID().toString(); - options2.setWorkflowId(wfId); - options2.setExecutionStartToCloseTimeout(Duration.ofSeconds(30)); - options2.setTaskStartToCloseTimeout(Duration.ofSeconds(30)); - options2.setTaskList(taskList + "2"); - - TestWorkflowCrossDomain wf = - workflowClient.newWorkflowStub(TestWorkflowCrossDomain.class, options.build()); - - TestWorkflowSignaled simpleWorkflow = - workflowClient2.newWorkflowStub(TestWorkflowSignaled.class, options2.build()); - - CompletableFuture result = WorkflowClient.execute(simpleWorkflow::execute); - assertEquals("Signaled External workflow", wf.execute(wfId)); - assertEquals("Simple workflow signaled", result.get()); - } - public static class TestSignalExternalWorkflowFailure implements TestWorkflow1 { @Override @@ -3636,7 +3324,7 @@ public String execute(String taskList) { @Test public void testChildWorkflowAsyncRetry() { AngryChildActivityImpl angryChildActivity = new AngryChildActivityImpl(); - worker.registerActivitiesImplementations(angryChildActivity); + cadenceTestRule.getWorker().registerActivitiesImplementations(angryChildActivity); startWorkerFor(TestChildWorkflowAsyncRetryWorkflow.class, AngryChild.class); WorkflowOptions.Builder options = new WorkflowOptions.Builder(); @@ -3899,11 +3587,9 @@ public String execute(String testName) { } @Test + // Min interval in cron is 1min. So we will not test it against real service in CI. + @RequiresTestService public void testWorkflowWithCronSchedule() { - // Min interval in cron is 1min. So we will not test it against real service in Jenkins. - // Feel free to uncomment the line below and test in local. - Assume.assumeFalse("skipping as test will timeout", useExternalService); - startWorkerFor(TestWorkflowWithCronScheduleImpl.class); WorkflowStub client = @@ -3938,11 +3624,9 @@ public String execute(String taskList) { } @Test + // Min interval in cron is 1min. So we will not test it against real service in CI. + @RequiresTestService public void testChildWorkflowWithCronSchedule() { - // Min interval in cron is 1min. So we will not test it against real service in Jenkins. - // Feel free to uncomment the line below and test in local. - Assume.assumeFalse("skipping as test will timeout", useExternalService); - startWorkerFor(TestCronParentWorkflow.class, TestWorkflowWithCronScheduleImpl.class); WorkflowStub client = @@ -3952,7 +3636,7 @@ public void testChildWorkflowWithCronSchedule() { .setExecutionStartToCloseTimeout(Duration.ofHours(10)) .build()); client.start(testName.getMethodName()); - testEnvironment.sleep(Duration.ofHours(3)); + cadenceTestRule.sleep(Duration.ofHours(3)); client.cancel(); try { @@ -4627,9 +4311,8 @@ public void testGetVersion() { } @Test + @RequiresDockerService public void testDelayStart() { - assumeTrue("skipping for non docker tests", useExternalService); - int delaySeconds = 5; startWorkerFor(TestGetVersionWorkflowImpl.class); WorkflowOptions options = @@ -4670,9 +4353,8 @@ public String execute(String taskList) { } @Test + @RequiresTestService public void testGetVersion2() { - Assume.assumeFalse("skipping for docker tests", useExternalService); - startWorkerFor(TestGetVersionWorkflow2Impl.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -4929,28 +4611,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionAdded() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionAddedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - public void testGetVersionAddedWithCadenceChangeVersion() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistoryWithCadenceChangeVersion.json", TestGetVersionAddedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public static class TestGetVersionRemovedImpl implements TestWorkflow1 { @Override @@ -4962,17 +4622,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionRemoved() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionRemovedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public static class TestGetVersionRemoveAndAddImpl implements TestWorkflow1 { @Override @@ -4985,17 +4634,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionRemoveAndAdd() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionRemoveAndAddImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public interface DeterminismFailingWorkflow { @WorkflowMethod @@ -5051,13 +4689,8 @@ public void testNonDeterministicWorkflowPolicyFailWorkflow() { new WorkflowImplementationOptions.Builder() .setNonDeterministicWorkflowPolicy(FailWorkflow) .build(); - worker.registerWorkflowImplementationTypes( - implementationOptions, DeterminismFailingWorkflowImpl.class); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + startWorkerFor(implementationOptions, DeterminismFailingWorkflowImpl.class); + WorkflowOptions options = new WorkflowOptions.Builder() .setExecutionStartToCloseTimeout(Duration.ofSeconds(1)) @@ -5170,7 +4803,9 @@ public List query(List arg) { @Test public void testGenericParametersWorkflow() throws ExecutionException, InterruptedException { - worker.registerActivitiesImplementations(new GenericParametersActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new GenericParametersActivityImpl()); startWorkerFor(GenericParametersWorkflowImpl.class); GenericParametersWorkflow workflowStub = workflowClient.newWorkflowStub( @@ -5256,7 +4891,9 @@ public String execute(String taskList) { @Test public void testNonSerializableExceptionInActivity() { - worker.registerActivitiesImplementations(new NonSerializableExceptionActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new NonSerializableExceptionActivityImpl()); startWorkerFor(TestNonSerializableExceptionInActivityWorkflow.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -5311,7 +4948,9 @@ public String execute(String taskList) { @Test public void testNonSerializableArgumentsInActivity() { - worker.registerActivitiesImplementations(new NonDeserializableExceptionActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new NonDeserializableExceptionActivityImpl()); startWorkerFor(TestNonSerializableArgumentsInActivityWorkflow.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -5402,7 +5041,9 @@ public String execute(int activityCount, String taskList) { @Ignore // Requires DEBUG_TIMEOUTS=true public void testLargeHistory() { final int activityCount = 1000; - worker.registerActivitiesImplementations(new TestLargeWorkflowActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new TestLargeWorkflowActivityImpl()); startWorkerFor(TestLargeHistory.class); TestLargeWorkflow workflowStub = workflowClient.newWorkflowStub( @@ -5708,21 +5349,13 @@ public void testSignalOrderingWorkflow() { WorkflowClient.start(workflowStub::run); // Suspend polling so that all the signals will be received in the same decision task. - if (useExternalService) { - workerFactory.suspendPolling(); - } else { - testEnvironment.getWorkerFactory().suspendPolling(); - } + cadenceTestRule.suspendPolling(); workflowStub.signal("test1"); workflowStub.signal("test2"); workflowStub.signal("test3"); - if (useExternalService) { - workerFactory.resumePolling(); - } else { - testEnvironment.getWorkerFactory().resumePolling(); - } + cadenceTestRule.resumePolling(); List result = workflowStub.run(); List expected = Arrays.asList("test1", "test2", "test3"); @@ -5768,29 +5401,6 @@ public String execute(String taskList) { } } - @Test - public void testWorkflowReset() throws Exception { - // Leave the following code to generate history. - // startWorkerFor(TestWorkflowResetReplayWorkflow.class, TestMultiargsWorkflowsImpl.class); - // TestWorkflow1 workflowStub = - // workflowClient.newWorkflowStub( - // TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); - // workflowStub.execute(taskList); - // - // try { - // Thread.sleep(60000000); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } - - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "resetWorkflowHistory.json", TestWorkflowResetReplayWorkflow.class); - } - public interface GreetingWorkflow { @WorkflowMethod @@ -5837,19 +5447,6 @@ public void createGreeting(String name) { } } - // Server doesn't guarantee that the timer fire timestamp is larger or equal of the - // expected fire time. This test ensures that client still fires timer in this case. - @Test - public void testTimerFiringTimestampEarlierThanExpected() throws Exception { - - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", stickyOff); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "timerfiring.json", TimerFiringWorkflowImpl.class); - } - public interface TestCompensationWorkflow { @WorkflowMethod void compensate(); @@ -5982,11 +5579,7 @@ public void testExceptionInSignal() throws InterruptedException { // Suspend polling so that decision tasks are not retried. Otherwise it will affect our thread // count. - if (useExternalService) { - workerFactory.suspendPolling(); - } else { - testEnvironment.getWorkerFactory().suspendPolling(); - } + cadenceTestRule.suspendPolling(); // Wait for decision task retry to finish. Thread.sleep(10000); @@ -6139,7 +5732,7 @@ public String getState() { public void testGetVersionRetry() throws ExecutionException, InterruptedException { TestActivities activity = mock(TestActivities.class); when(activity.activity1(1)).thenReturn(1); - worker.registerActivitiesImplementations(activity); + cadenceTestRule.getWorker().registerActivitiesImplementations(activity); startWorkerFor(TestGetVersionWorkflowRetryImpl.class); TestWorkflow3 workflowStub = @@ -6153,16 +5746,6 @@ public void testGetVersionRetry() throws ExecutionException, InterruptedExceptio assertEquals("activity1", workflowStub.getState()); } - @Test - public void testGetVersionWithRetryReplay() throws Exception { - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class); - } - @Test(expected = ExecutionException.class) public void testWorkflowTimesOutWhenNoOverridesProvided() throws Exception { startWorkerFor(TestWorkflowActivityOptionOverride.class); @@ -6224,10 +5807,10 @@ public void mySignal(String value) { } @Test + @RequiresTestService public void testEnqueueWorkflow() throws Exception { // The docker service isn't set up for async calls. Only run against the TestEnvironment version // of this test - assumeTrue("The docker service doesn't support async calls", !useDockerService); startWorkerFor(TestEnqueueWorkflow.class); WorkflowStub stub =