Skip to content

Commit

Permalink
The recent changes in worker pool affected worker verticles with a re…
Browse files Browse the repository at this point in the history
…gression: the named verticle worker pool is closed before undeploying the verticle which leads to have the worker verticle undeployment fail when the worker pool is used only by this verticle.

Close the worker pool after the verticle has been deployed instead of before.
  • Loading branch information
vietj committed Sep 22, 2023
1 parent fc15435 commit 9bfa6d4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/impl/DeploymentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext)
status = ST_UNDEPLOYING;
return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext));
} else {
if (workerPool != null) {
workerPool.close();
}
status = ST_UNDEPLOYED;
List<Future<?>> undeployFutures = new ArrayList<>();
if (parent != null) {
Expand Down Expand Up @@ -340,6 +337,9 @@ public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext)
Promise<Void> resolvingPromise = undeployingContext.promise();
Future.all(undeployFutures).<Void>mapEmpty().onComplete(resolvingPromise);
Future<Void> fut = resolvingPromise.future();
if (workerPool != null) {
fut = fut.andThen(ar -> workerPool.close());
}
Handler<Void> handler = undeployHandler;
if (handler != null) {
undeployHandler = null;
Expand Down
24 changes: 23 additions & 1 deletion src/test/java/io/vertx/core/NamedWorkerPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.*;
Expand Down Expand Up @@ -268,7 +269,7 @@ public void start() throws Exception {
}

@Test
public void testDeployUsingNamedPool() throws Exception {
public void testDeployUsingNamedPool() {
AtomicReference<Thread> thread = new AtomicReference<>();
String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
Promise<Void> undeployed = Promise.promise();
Expand All @@ -292,6 +293,27 @@ public void start() {
assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.TERMINATED);
}

@Test
public void testNamedWorkerPoolShouldBeClosedAfterVerticleIsUndeployed() {
AtomicReference<String> threadName = new AtomicReference<>();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
}
@Override
public void stop() {
threadName.set(Thread.currentThread().getName());
}
}, new DeploymentOptions().setWorker(true).setWorkerPoolName("test-worker")).onComplete(onSuccess(id -> {
vertx.undeploy(id).onComplete(onSuccess(v -> {
assertNotNull(threadName.get());
assertTrue(threadName.get().startsWith("test-worker"));
testComplete();
}));
}));
await();
}

@Test
public void testDeployUsingNamedWorkerDoesNotCreateExtraEventLoop() {
int instances = getOptions().getEventLoopPoolSize();
Expand Down

0 comments on commit 9bfa6d4

Please sign in to comment.