diff --git a/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java b/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java index b498e0b5be9..d0c8306c568 100644 --- a/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java +++ b/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java @@ -54,6 +54,11 @@ static void fromJson(Iterable> json, Deploym obj.setMaxWorkerExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); } break; + case "reuseCurrentEventLoop": + if (member.getValue() instanceof Boolean) { + obj.setReuseCurrentEventLoop((Boolean)member.getValue()); + } + break; } } } @@ -79,5 +84,6 @@ static void toJson(DeploymentOptions obj, java.util.Map json) { if (obj.getMaxWorkerExecuteTimeUnit() != null) { json.put("maxWorkerExecuteTimeUnit", obj.getMaxWorkerExecuteTimeUnit().name()); } + json.put("reuseCurrentEventLoop", obj.getReuseCurrentEventLoop()); } } diff --git a/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java b/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java index 6e6ffb6f6b3..2a03e1e4468 100644 --- a/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java @@ -32,6 +32,7 @@ public class DeploymentOptions { public static final boolean DEFAULT_WORKER = false; public static final boolean DEFAULT_HA = false; public static final int DEFAULT_INSTANCES = 1; + public static final boolean DEFAULT_REUSE_CURRENT_EVENT_LOOP = false; private JsonObject config; private ThreadingModel threadingModel; @@ -42,6 +43,7 @@ public class DeploymentOptions { private int workerPoolSize; private long maxWorkerExecuteTime; private TimeUnit maxWorkerExecuteTimeUnit; + private boolean reuseCurrentEventLoop; /** * Default constructor @@ -54,6 +56,7 @@ public DeploymentOptions() { this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE; this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME; this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; + this.reuseCurrentEventLoop = DEFAULT_REUSE_CURRENT_EVENT_LOOP; } /** @@ -292,6 +295,26 @@ public DeploymentOptions setClassLoader(ClassLoader classLoader) { return this; } + /** + * @return true if the verticle should reuse the current event loop + */ + public boolean getReuseCurrentEventLoop() { + return reuseCurrentEventLoop; + } + + /** + * Set whether the verticle should reuse the current event loop + *

when reuseCurrentEventLoop is set to true, the verticle will be deployed on the current event loop. + * If no Context is set for the current thread, a Context will be created for the current thread. + * + * @param reuseCurrentEventLoop true if the verticle should reuse the current event loop + * @return a reference to this, so the API can be used fluently + */ + public DeploymentOptions setReuseCurrentEventLoop(boolean reuseCurrentEventLoop) { + this.reuseCurrentEventLoop = reuseCurrentEventLoop; + return this; + } + /** * Convert this to JSON * diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java index 032d399373e..e8f60f0e808 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java @@ -142,7 +142,9 @@ public Future deploy(DeploymentContext deployment) { } break; default: - context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); + context = options.getReuseCurrentEventLoop() + ? vertx.createEventLoopContext(vertx.getOrCreateContext().nettyEventLoop(), deployment,closeFuture,workerPool, tccl) + : vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); break; } Instance instance = new Instance(verticle, context); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index 9ec4f47c3fe..734d193b06f 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -166,6 +166,13 @@ default ContextInternal createEventLoopContext(DeploymentContext deployment, Clo return createContext(ThreadingModel.EVENT_LOOP, deployment, closeFuture, workerPool, tccl); } + /** + * @return event loop context + */ + default ContextInternal createEventLoopContext(EventLoop eventLoop, DeploymentContext deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { + return createContext(ThreadingModel.EVENT_LOOP, eventLoop, closeFuture, workerPool, deployment, tccl); + } + /** * @return event loop context */ diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java new file mode 100644 index 00000000000..61d4fc8eb17 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java @@ -0,0 +1,58 @@ +package io.vertx.tests.deployment; + +import io.netty.channel.EventLoop; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.ThreadingModel; +import io.vertx.core.internal.ContextInternal; +import org.junit.Test; + +/** + * @author dremalike + */ +public class CurrentEventLoopDeploymentTest extends AbstractVerticleTest { + + @Test + public void testDeploy() throws InterruptedException { + waitFor(1); + ContextInternal currentContext = (ContextInternal) vertx.getOrCreateContext(); + EventLoop targetEventLoop = currentContext.nettyEventLoop(); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + EventLoop currentEventLoop = ((ContextInternal) context).nettyEventLoop(); + assertEquals(targetEventLoop, currentEventLoop); + assertNotSame(currentContext, context); + } + + }, new DeploymentOptions().setReuseCurrentEventLoop(true)) + .onSuccess(this::assertNotNull) + .onFailure(this::fail) + .onComplete(s -> testComplete()); + + await(); + } + + @Test + public void testExecuteBlocking() { + waitFor(1); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start(Promise startPromise) throws Exception { + Thread eventLoopThread = Thread.currentThread(); + vertx.executeBlocking(() -> { + assertNotSame(eventLoopThread, Thread.currentThread()); + startPromise.complete(); + return null; + }); + } + }, new DeploymentOptions().setReuseCurrentEventLoop(true)) + .onSuccess(this::assertNotNull) + .onFailure(this::fail) + .onComplete(s -> testComplete()); + await(); + } + + +}