Skip to content

Commit

Permalink
Redefine what a context closed means.
Browse files Browse the repository at this point in the history
Motivation:

Context#close implementation is currently too drastic and will refuse to execute any task in addition of interrupting threads. After a context is closed there is often the need to let task execution happen in order to cleanup state, e.g. pending HTTP tasks due to back-pressure catch up or timers.

Changes:

Context#close now allows to execute tasks.
  • Loading branch information
vietj committed Nov 6, 2024
1 parent a87afd2 commit fcfc661
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 63 deletions.
40 changes: 10 additions & 30 deletions src/main/java/io/vertx/core/impl/TaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ private void run() {
for (; ; ) {
final ExecuteTask execute;
synchronized (tasks) {
if (closed) {
return;
}
Task task = tasks.poll();
if (task == null) {
currentExecutor = null;
Expand Down Expand Up @@ -125,9 +122,6 @@ private ContinuationTask continuationTask() {
*/
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
synchronized (tasks) {
if (closed) {
throw new RejectedExecutionException("Closed");
}
if (currentExecutor == null) {
currentExecutor = executor;
try {
Expand Down Expand Up @@ -161,18 +155,15 @@ public final static class CloseResult {
private final Runnable activeTask;
private final List<Runnable> suspendedTasks;
private final List<Thread> suspendedThreads;
private final List<Runnable> pendingTasks;

private CloseResult(Thread activeThread,
Runnable activeTask,
List<Thread> suspendedThreads,
List<Runnable> suspendedTasks,
List<Runnable> pendingTasks) {
List<Runnable> suspendedTasks) {
this.activeThread = activeThread;
this.activeTask = activeTask;
this.suspendedThreads = suspendedThreads;
this.suspendedTasks = suspendedTasks;
this.pendingTasks = pendingTasks;
}

/**
Expand All @@ -186,13 +177,6 @@ public Runnable activeTask() {
return activeTask;
}

/**
* @return the list of pending tasks
*/
public List<Runnable> pendingTasks() {
return pendingTasks;
}

/**
* @return the list of suspended threads
*/
Expand All @@ -214,7 +198,6 @@ public List<Runnable> suspendedTasks() {
* @return a structure of suspended threads and pending tasks
*/
public CloseResult close() {
List<Runnable> pendingTasks = Collections.emptyList();
List<Thread> suspendedThreads;
List<Runnable> suspendedTasks;
Thread activeThread;
Expand All @@ -225,19 +208,16 @@ public CloseResult close() {
}
suspendedThreads = new ArrayList<>(continuations.size());
suspendedTasks = new ArrayList<>(continuations.size());
for (Task t : tasks) {
if (t instanceof ExecuteTask) {
if (pendingTasks.isEmpty()) {
pendingTasks = new LinkedList<>();
}
pendingTasks.add(((ExecuteTask)t).runnable);
} else if (t instanceof ContinuationTask) {
ContinuationTask rt = (ContinuationTask) t;
suspendedThreads.add(rt.thread);
suspendedTasks.add(rt.task.runnable);
Iterator<Task> it = tasks.iterator();
while (it.hasNext()) {
Task task = it.next();
if (task instanceof ContinuationTask) {
ContinuationTask continuationTask = (ContinuationTask) task;
suspendedThreads.add(continuationTask.thread);
suspendedTasks.add(continuationTask.task.runnable);
it.remove();
}
}
tasks.clear();
for (ContinuationTask cont : continuations) {
suspendedThreads.add(cont.thread);
suspendedTasks.add(cont.task.runnable);
Expand All @@ -248,7 +228,7 @@ public CloseResult close() {
currentExecutor = null;
closed = true;
}
return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks, pendingTasks);
return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks);
}

private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {
Expand Down
7 changes: 0 additions & 7 deletions src/main/java/io/vertx/core/impl/WorkerTaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ class WorkerTaskQueue extends TaskQueue {
void shutdown(EventLoop executor, Promise<Void> completion) {
TaskQueue.CloseResult closeResult = close();

// Reject all pending tasks
List<Runnable> pendingTasks = closeResult.pendingTasks();
for (Runnable pendingTask : pendingTasks) {
WorkerTask pendingWorkerTask = (WorkerTask) pendingTask;
pendingWorkerTask.reject();
}

// Maintain context invariant: serialize task execution while interrupting tasks
class InterruptSequence {

Expand Down
4 changes: 0 additions & 4 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,11 @@ public void testExecuteBlockingClose() {
latch.await();
return "";
});
Future<String> fut2 = ctx.executeBlocking(() -> "");
assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.WAITING);
ctx.close();
assertWaitUntil(fut1::isComplete);
assertTrue(fut1.failed());
assertTrue(fut1.cause() instanceof InterruptedException);
assertWaitUntil(fut2::isComplete);
assertTrue(fut2.failed());
assertTrue(fut2.cause() instanceof RejectedExecutionException);
}

@Test
Expand Down
24 changes: 3 additions & 21 deletions src/test/java/io/vertx/core/TaskQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,20 +202,6 @@ public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
Assertions.assertThat(taskQueue.isEmpty()).isTrue();
}

@Test
public void testClosePendingTasks() {
TaskQueue taskQueue = new TaskQueue();
Deque<Runnable> pending = new ConcurrentLinkedDeque<>();
Executor executor = pending::add;
Runnable task = () -> {
};
taskQueue.execute(task, executor);
assertEquals(1, pending.size());
TaskQueue.CloseResult result = taskQueue.close();
assertEquals(1, result.pendingTasks().size());
assertSame(task, result.pendingTasks().get(0));
}

@Test
public void testCloseSuspendedTasks() {
TaskQueue taskQueue = new TaskQueue();
Expand Down Expand Up @@ -325,14 +311,10 @@ public void testSubmitAfterClose() {
taskQueue.close();
Deque<Runnable> pending = new ConcurrentLinkedDeque<>();
Executor exec = pending::add;
try {
taskQueue.execute(() -> {
taskQueue.execute(() -> {

}, exec);
fail();
} catch (RejectedExecutionException expected) {
}
assertEquals(0, pending.size());
}, exec);
assertEquals(1, pending.size());
}

@Test
Expand Down
13 changes: 12 additions & 1 deletion src/test/java/io/vertx/core/VirtualThreadContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ public void testAwaitWhenClosed() throws Exception {
assertFalse(Thread.currentThread().isInterrupted());
}
try {
Promise.promise().future().await();
Future.await(Promise.promise().future());
fail();
} catch (Exception e) {
assertEquals(InterruptedException.class, e.getClass());
Expand All @@ -376,4 +376,15 @@ public void testAwaitWhenClosed() throws Exception {
ctx.close();
await();
}

@Test
public void testSubmitAfterClose() {
Assume.assumeTrue(isVirtualThreadAvailable());
ContextInternal ctx = vertx.createVirtualThreadContext();
ctx.close();
ctx.runOnContext(v -> {
testComplete();
});
await();
}
}

0 comments on commit fcfc661

Please sign in to comment.