From ec641237d7dae816bb8af559a40c753be26c3cff Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 5 Nov 2024 09:20:22 +0100 Subject: [PATCH 1/2] Future.await should interrupt the current thread when the worker executor is closed. Motivation: Future.await incorrectly performs a no-op when the worker executor is closed (returns a null latch), which reports a failure that might not exist. Changes: When the worker executor returns null, throw an interrupted exception. --- .../src/main/java/io/vertx/core/Future.java | 5 +++- .../VirtualThreadContextTest.java | 27 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/vertx-core/src/main/java/io/vertx/core/Future.java b/vertx-core/src/main/java/io/vertx/core/Future.java index 211bb54f0e0..bb2f168f63d 100644 --- a/vertx-core/src/main/java/io/vertx/core/Future.java +++ b/vertx-core/src/main/java/io/vertx/core/Future.java @@ -738,9 +738,12 @@ default T await(long timeout, TimeUnit unit) throws TimeoutException { } if (succeeded()) { return result(); - } else { + } else if (failed()) { Utils.throwAsUnchecked(cause()); return null; + } else { + Utils.throwAsUnchecked(new InterruptedException("Context closed")); + return null; } } diff --git a/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java b/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java index 6e13216e918..66d16e543f5 100644 --- a/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java @@ -353,4 +353,31 @@ public void testContextCloseContextSerialization() throws Exception { } f.await(); } + + @Test + public void testAwaitWhenClosed() throws Exception { + Assume.assumeTrue(isVirtualThreadAvailable()); + ContextInternal ctx = vertx.createVirtualThreadContext(); + CountDownLatch latch = new CountDownLatch(1); + ctx.runOnContext(v -> { + latch.countDown(); + try { + new CountDownLatch(1).await(); + fail(); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } + try { + Promise.promise().future().await(); + fail(); + } catch (Exception e) { + assertEquals(InterruptedException.class, e.getClass()); + testComplete(); + } + }); + awaitLatch(latch); + // Interrupts virtual thread + ctx.close(); + await(); + } } From ee8fe7942366921db5f52b58594cbba712e4b907 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 5 Nov 2024 09:37:20 +0100 Subject: [PATCH 2/2] Redefine what a context closed means. Motivation: Context#close implementation is currently too drastic and will refuse to execute any task in addition of interrupting threads. After a context is closed there is often the need to let task execution happen in order to cleanup state, e.g. pending HTTP tasks due to back-pressure catch up or timers. Changes: Context#close now allows to execute tasks. --- .../java/io/vertx/core/impl/TaskQueue.java | 40 +++++-------------- .../io/vertx/core/impl/WorkerTaskQueue.java | 7 ---- .../io/vertx/tests/context/ContextTest.java | 4 -- .../io/vertx/tests/context/TaskQueueTest.java | 24 ++--------- .../VirtualThreadContextTest.java | 11 +++++ 5 files changed, 24 insertions(+), 62 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java b/vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java index 56dfacf97f4..8a850d04a02 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java @@ -54,9 +54,6 @@ private void run() { for (; ; ) { final ExecuteTask execute; synchronized (tasks) { - if (closed) { - return; - } Task task = tasks.poll(); if (task == null) { currentExecutor = null; @@ -125,9 +122,6 @@ private ContinuationTask continuationTask() { */ public void execute(Runnable task, Executor executor) throws RejectedExecutionException { synchronized (tasks) { - if (closed) { - throw new RejectedExecutionException("Closed"); - } if (currentExecutor == null) { currentExecutor = executor; try { @@ -161,18 +155,15 @@ public final static class CloseResult { private final Runnable activeTask; private final List suspendedTasks; private final List suspendedThreads; - private final List pendingTasks; private CloseResult(Thread activeThread, Runnable activeTask, List suspendedThreads, - List suspendedTasks, - List pendingTasks) { + List suspendedTasks) { this.activeThread = activeThread; this.activeTask = activeTask; this.suspendedThreads = suspendedThreads; this.suspendedTasks = suspendedTasks; - this.pendingTasks = pendingTasks; } /** @@ -186,13 +177,6 @@ public Runnable activeTask() { return activeTask; } - /** - * @return the list of pending tasks - */ - public List pendingTasks() { - return pendingTasks; - } - /** * @return the list of suspended threads */ @@ -214,7 +198,6 @@ public List suspendedTasks() { * @return a structure of suspended threads and pending tasks */ public CloseResult close() { - List pendingTasks = Collections.emptyList(); List suspendedThreads; List suspendedTasks; Thread activeThread; @@ -225,19 +208,16 @@ public CloseResult close() { } suspendedThreads = new ArrayList<>(continuations.size()); suspendedTasks = new ArrayList<>(continuations.size()); - for (Task t : tasks) { - if (t instanceof ExecuteTask) { - if (pendingTasks.isEmpty()) { - pendingTasks = new LinkedList<>(); - } - pendingTasks.add(((ExecuteTask)t).runnable); - } else if (t instanceof ContinuationTask) { - ContinuationTask rt = (ContinuationTask) t; - suspendedThreads.add(rt.thread); - suspendedTasks.add(rt.task.runnable); + Iterator it = tasks.iterator(); + while (it.hasNext()) { + Task task = it.next(); + if (task instanceof ContinuationTask) { + ContinuationTask continuationTask = (ContinuationTask) task; + suspendedThreads.add(continuationTask.thread); + suspendedTasks.add(continuationTask.task.runnable); + it.remove(); } } - tasks.clear(); for (ContinuationTask cont : continuations) { suspendedThreads.add(cont.thread); suspendedTasks.add(cont.task.runnable); @@ -248,7 +228,7 @@ public CloseResult close() { currentExecutor = null; closed = true; } - return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks, pendingTasks); + return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks); } private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task { diff --git a/vertx-core/src/main/java/io/vertx/core/impl/WorkerTaskQueue.java b/vertx-core/src/main/java/io/vertx/core/impl/WorkerTaskQueue.java index 9fa3bab69f8..e3bd213e8a7 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/WorkerTaskQueue.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/WorkerTaskQueue.java @@ -34,13 +34,6 @@ public WorkerTaskQueue() { void shutdown(EventLoop executor, Promise completion) { TaskQueue.CloseResult closeResult = close(); - // Reject all pending tasks - List pendingTasks = closeResult.pendingTasks(); - for (Runnable pendingTask : pendingTasks) { - WorkerTask pendingWorkerTask = (WorkerTask) pendingTask; - pendingWorkerTask.reject(); - } - // Maintain context invariant: serialize task execution while interrupting tasks class InterruptSequence { diff --git a/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java b/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java index d9151c95502..3fdbd93edd0 100644 --- a/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java @@ -250,15 +250,11 @@ public void testExecuteBlockingClose() { latch.await(); return ""; }); - Future fut2 = ctx.executeBlocking(() -> ""); assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.WAITING); ctx.close(); assertWaitUntil(fut1::isComplete); assertTrue(fut1.failed()); assertTrue(fut1.cause() instanceof InterruptedException); - assertWaitUntil(fut2::isComplete); - assertTrue(fut2.failed()); - assertTrue(fut2.cause() instanceof RejectedExecutionException); } @Test diff --git a/vertx-core/src/test/java/io/vertx/tests/context/TaskQueueTest.java b/vertx-core/src/test/java/io/vertx/tests/context/TaskQueueTest.java index 50341d36006..577bad5b404 100644 --- a/vertx-core/src/test/java/io/vertx/tests/context/TaskQueueTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/context/TaskQueueTest.java @@ -202,20 +202,6 @@ public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() { Assertions.assertThat(taskQueue.isEmpty()).isTrue(); } - @Test - public void testClosePendingTasks() { - TaskQueue taskQueue = new TaskQueue(); - Deque pending = new ConcurrentLinkedDeque<>(); - Executor executor = pending::add; - Runnable task = () -> { - }; - taskQueue.execute(task, executor); - assertEquals(1, pending.size()); - TaskQueue.CloseResult result = taskQueue.close(); - assertEquals(1, result.pendingTasks().size()); - assertSame(task, result.pendingTasks().get(0)); - } - @Test public void testCloseSuspendedTasks() { TaskQueue taskQueue = new TaskQueue(); @@ -325,14 +311,10 @@ public void testSubmitAfterClose() { taskQueue.close(); Deque pending = new ConcurrentLinkedDeque<>(); Executor exec = pending::add; - try { - taskQueue.execute(() -> { + taskQueue.execute(() -> { - }, exec); - fail(); - } catch (RejectedExecutionException expected) { - } - assertEquals(0, pending.size()); + }, exec); + assertEquals(1, pending.size()); } @Test diff --git a/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java b/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java index 66d16e543f5..239032a8a2d 100644 --- a/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/virtualthread/VirtualThreadContextTest.java @@ -380,4 +380,15 @@ public void testAwaitWhenClosed() throws Exception { ctx.close(); await(); } + + @Test + public void testSubmitAfterClose() { + Assume.assumeTrue(isVirtualThreadAvailable()); + ContextInternal ctx = vertx.createVirtualThreadContext(); + ctx.close(); + ctx.runOnContext(v -> { + testComplete(); + }); + await(); + } }