Skip to content

Commit

Permalink
Fix WorkflowQueue#offer and increase test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
natemort committed Oct 31, 2024
1 parent 2afe97e commit 2673d32
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void put(E e) throws InterruptedException {

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
boolean timedOut =
boolean success =
WorkflowThread.await(
unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity);
if (timedOut) {
if (!success) {
return false;
}
queue.addLast(e);
Expand Down
157 changes: 157 additions & 0 deletions src/test/java/com/uber/cadence/internal/sync/PromiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.uber.cadence.workflow.CompletablePromise;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.Workflow;
import java.time.Duration;
import java.util.ArrayList;
import java.util.IllegalFormatCodePointException;
import java.util.List;
Expand Down Expand Up @@ -84,6 +85,122 @@ public void testFailure() throws Throwable {
trace.setExpected(expected);
}

@Test
public void testTimedFailure() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false, () -> f.completeExceptionally(new IllegalArgumentException("foo")))
.start();
WorkflowInternal.newThread(
false,
() -> {
try {
f.get(10, TimeUnit.DAYS);
trace.add("thread1 get success");
fail("failure expected");
} catch (Exception e) {
assertEquals(IllegalArgumentException.class, e.getClass());
trace.add("thread1 get failure");
}
})
.start();
trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {
"root begin", "root done", "thread1 get failure",
};
trace.setExpected(expected);
}

@Test
public void testGetFailure() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread1 begin");
assertEquals(IllegalArgumentException.class, f.getFailure().getClass());
trace.add("thread1 done");
})
.start();
WorkflowInternal.newThread(
false,
() -> {
f.completeExceptionally(new IllegalArgumentException("foo"));
trace.add("thread2 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {"root begin", "root done", "thread1 begin", "thread2 done", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testGetFailureWithTimeout() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
f.get(1, TimeUnit.MINUTES);
trace.add("thread1 get success");
fail("failure expected");
} catch (Exception e) {
assertEquals(IllegalArgumentException.class, e.getClass());
trace.add("thread1 get failure");
}
})
.start();
WorkflowInternal.newThread(
false,
() -> {
Workflow.sleep(Duration.ofSeconds(30));
trace.add("thread2 awake");
f.completeExceptionally(new IllegalArgumentException("foo"));
trace.add("thread2 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin"};
trace.setExpected(expected);

currentTime += Duration.ofSeconds(31).toMillis();
r.runUntilAllBlocked();
expected =
new String[] {
"root begin",
"root done",
"thread1 begin",
"thread2 awake",
"thread2 done",
"thread1 get failure"
};
trace.setExpected(expected);
}

@Test
public void testGetTimeout() throws Throwable {
ExecutorService threadPool =
Expand Down Expand Up @@ -249,6 +366,46 @@ public void testGetDefaultOnFailure() throws Throwable {
threadPool.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testGetDefault_success() throws Throwable {
ExecutorService threadPool =
new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>());

DeterministicRunner r =
DeterministicRunner.newRunner(
threadPool,
null,
() -> currentTime,
() -> {
CompletablePromise<String> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread1 begin");
assertEquals("success", f.get("default"));
trace.add("thread1 get success");
})
.start();
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread2 begin");
f.complete("success");
})
.start();
trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {
"root begin", "root done", "thread1 begin", "thread2 begin", "thread1 get success",
};
trace.setExpected(expected);
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testMultiple() throws Throwable {
DeterministicRunner r =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.uber.cadence.workflow.QueueConsumer;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -137,4 +140,171 @@ public void testPutBlocking() throws Throwable {
};
trace.setExpected(expected);
}

@Test
public void testPoll() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
f.offer("foo");
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
Assert.assertEquals("foo", f.poll(1, TimeUnit.SECONDS));
trace.add("thread1 foo");
Assert.assertNull(f.poll(1, TimeUnit.SECONDS));
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testOffer() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
Assert.assertTrue(f.offer("foo"));
Assert.assertFalse(f.offer("bar"));
trace.add("thread1 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testOfferTimed() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
Assert.assertTrue(f.offer("foo", 1, TimeUnit.SECONDS));
trace.add("thread1 foo");
Assert.assertFalse(f.offer("bar", 1, TimeUnit.SECONDS));
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testMappedTake() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
f.offer(true);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
QueueConsumer<String> mappedQueue = f.map(x -> x ? "yes" : "no");
trace.add("thread1 begin");
Assert.assertEquals("yes", mappedQueue.take());
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testMappedPoll() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
f.offer(true);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
QueueConsumer<String> mappedQueue =
f.map(x -> x ? "yes" : "no").map(x -> x);
trace.add("thread1 begin");
Assert.assertEquals("yes", mappedQueue.poll(1, TimeUnit.SECONDS));
trace.add("thread1 yes");
Assert.assertNull(mappedQueue.poll(1, TimeUnit.SECONDS));

trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 yes"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 yes", "thread1 done"};
trace.setExpected(expected);
}
}

0 comments on commit 2673d32

Please sign in to comment.