From 2673d322899d34d602dcfd0272fd7027c06f8583 Mon Sep 17 00:00:00 2001 From: Nate Mortensen Date: Tue, 29 Oct 2024 16:21:22 -0700 Subject: [PATCH] Fix WorkflowQueue#offer and increase test coverage --- .../internal/sync/WorkflowQueueImpl.java | 4 +- .../cadence/internal/sync/PromiseTest.java | 157 ++++++++++++++++ .../sync/WorkflowInternalQueueTest.java | 170 ++++++++++++++++++ 3 files changed, 329 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java index cf9488238..c7fe7e21b 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java @@ -74,10 +74,10 @@ public void put(E e) throws InterruptedException { @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - boolean timedOut = + boolean success = WorkflowThread.await( unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity); - if (timedOut) { + if (!success) { return false; } queue.addLast(e); diff --git a/src/test/java/com/uber/cadence/internal/sync/PromiseTest.java b/src/test/java/com/uber/cadence/internal/sync/PromiseTest.java index a1aba03f4..e955fa522 100644 --- a/src/test/java/com/uber/cadence/internal/sync/PromiseTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/PromiseTest.java @@ -27,6 +27,7 @@ import com.uber.cadence.workflow.CompletablePromise; import com.uber.cadence.workflow.Promise; import com.uber.cadence.workflow.Workflow; +import java.time.Duration; import java.util.ArrayList; import java.util.IllegalFormatCodePointException; import java.util.List; @@ -84,6 +85,122 @@ public void testFailure() throws Throwable { trace.setExpected(expected); } + @Test + public void testTimedFailure() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> { + CompletablePromise f = Workflow.newPromise(); + trace.add("root begin"); + WorkflowInternal.newThread( + false, () -> f.completeExceptionally(new IllegalArgumentException("foo"))) + .start(); + WorkflowInternal.newThread( + false, + () -> { + try { + f.get(10, TimeUnit.DAYS); + trace.add("thread1 get success"); + fail("failure expected"); + } catch (Exception e) { + assertEquals(IllegalArgumentException.class, e.getClass()); + trace.add("thread1 get failure"); + } + }) + .start(); + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = + new String[] { + "root begin", "root done", "thread1 get failure", + }; + trace.setExpected(expected); + } + + @Test + public void testGetFailure() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> { + CompletablePromise f = Workflow.newPromise(); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + trace.add("thread1 begin"); + assertEquals(IllegalArgumentException.class, f.getFailure().getClass()); + trace.add("thread1 done"); + }) + .start(); + WorkflowInternal.newThread( + false, + () -> { + f.completeExceptionally(new IllegalArgumentException("foo")); + trace.add("thread2 done"); + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = + new String[] {"root begin", "root done", "thread1 begin", "thread2 done", "thread1 done"}; + trace.setExpected(expected); + } + + @Test + public void testGetFailureWithTimeout() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + CompletablePromise f = Workflow.newPromise(); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + try { + trace.add("thread1 begin"); + f.get(1, TimeUnit.MINUTES); + trace.add("thread1 get success"); + fail("failure expected"); + } catch (Exception e) { + assertEquals(IllegalArgumentException.class, e.getClass()); + trace.add("thread1 get failure"); + } + }) + .start(); + WorkflowInternal.newThread( + false, + () -> { + Workflow.sleep(Duration.ofSeconds(30)); + trace.add("thread2 awake"); + f.completeExceptionally(new IllegalArgumentException("foo")); + trace.add("thread2 done"); + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 begin"}; + trace.setExpected(expected); + + currentTime += Duration.ofSeconds(31).toMillis(); + r.runUntilAllBlocked(); + expected = + new String[] { + "root begin", + "root done", + "thread1 begin", + "thread2 awake", + "thread2 done", + "thread1 get failure" + }; + trace.setExpected(expected); + } + @Test public void testGetTimeout() throws Throwable { ExecutorService threadPool = @@ -249,6 +366,46 @@ public void testGetDefaultOnFailure() throws Throwable { threadPool.awaitTermination(1, TimeUnit.MINUTES); } + @Test + public void testGetDefault_success() throws Throwable { + ExecutorService threadPool = + new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); + + DeterministicRunner r = + DeterministicRunner.newRunner( + threadPool, + null, + () -> currentTime, + () -> { + CompletablePromise f = Workflow.newPromise(); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + trace.add("thread1 begin"); + assertEquals("success", f.get("default")); + trace.add("thread1 get success"); + }) + .start(); + WorkflowInternal.newThread( + false, + () -> { + trace.add("thread2 begin"); + f.complete("success"); + }) + .start(); + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = + new String[] { + "root begin", "root done", "thread1 begin", "thread2 begin", "thread1 get success", + }; + trace.setExpected(expected); + threadPool.shutdown(); + threadPool.awaitTermination(1, TimeUnit.MINUTES); + } + @Test public void testMultiple() throws Throwable { DeterministicRunner r = diff --git a/src/test/java/com/uber/cadence/internal/sync/WorkflowInternalQueueTest.java b/src/test/java/com/uber/cadence/internal/sync/WorkflowInternalQueueTest.java index cd727d921..02e921447 100644 --- a/src/test/java/com/uber/cadence/internal/sync/WorkflowInternalQueueTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/WorkflowInternalQueueTest.java @@ -20,8 +20,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.uber.cadence.workflow.QueueConsumer; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowQueue; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -137,4 +140,171 @@ public void testPutBlocking() throws Throwable { }; trace.setExpected(expected); } + + @Test + public void testPoll() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + WorkflowQueue f = WorkflowInternal.newQueue(1); + f.offer("foo"); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + try { + trace.add("thread1 begin"); + Assert.assertEquals("foo", f.poll(1, TimeUnit.SECONDS)); + trace.add("thread1 foo"); + Assert.assertNull(f.poll(1, TimeUnit.SECONDS)); + trace.add("thread1 done"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"}; + trace.setExpected(expected); + + currentTime += 1000; + r.runUntilAllBlocked(); + expected = + new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"}; + trace.setExpected(expected); + } + + @Test + public void testOffer() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + WorkflowQueue f = WorkflowInternal.newQueue(1); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + Assert.assertTrue(f.offer("foo")); + Assert.assertFalse(f.offer("bar")); + trace.add("thread1 done"); + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 done"}; + trace.setExpected(expected); + } + + @Test + public void testOfferTimed() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + WorkflowQueue f = WorkflowInternal.newQueue(1); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + try { + trace.add("thread1 begin"); + Assert.assertTrue(f.offer("foo", 1, TimeUnit.SECONDS)); + trace.add("thread1 foo"); + Assert.assertFalse(f.offer("bar", 1, TimeUnit.SECONDS)); + trace.add("thread1 done"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"}; + trace.setExpected(expected); + + currentTime += 1000; + r.runUntilAllBlocked(); + expected = + new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"}; + trace.setExpected(expected); + } + + @Test + public void testMappedTake() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + WorkflowQueue f = WorkflowInternal.newQueue(1); + f.offer(true); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + try { + QueueConsumer mappedQueue = f.map(x -> x ? "yes" : "no"); + trace.add("thread1 begin"); + Assert.assertEquals("yes", mappedQueue.take()); + trace.add("thread1 done"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 done"}; + trace.setExpected(expected); + } + + @Test + public void testMappedPoll() throws Throwable { + DeterministicRunner r = + DeterministicRunner.newRunner( + () -> currentTime, + () -> { + WorkflowQueue f = WorkflowInternal.newQueue(1); + f.offer(true); + trace.add("root begin"); + WorkflowInternal.newThread( + false, + () -> { + try { + QueueConsumer mappedQueue = + f.map(x -> x ? "yes" : "no").map(x -> x); + trace.add("thread1 begin"); + Assert.assertEquals("yes", mappedQueue.poll(1, TimeUnit.SECONDS)); + trace.add("thread1 yes"); + Assert.assertNull(mappedQueue.poll(1, TimeUnit.SECONDS)); + + trace.add("thread1 done"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); + + trace.add("root done"); + }); + r.runUntilAllBlocked(); + String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 yes"}; + trace.setExpected(expected); + + currentTime += 1000; + r.runUntilAllBlocked(); + expected = + new String[] {"root begin", "root done", "thread1 begin", "thread1 yes", "thread1 done"}; + trace.setExpected(expected); + } }