Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reuse current eventLoop for Deployment #5353

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Deploym
obj.setMaxWorkerExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "reuseCurrentEventLoop":
if (member.getValue() instanceof Boolean) {
obj.setReuseCurrentEventLoop((Boolean)member.getValue());
}
break;
}
}
}
Expand All @@ -79,5 +84,6 @@ static void toJson(DeploymentOptions obj, java.util.Map<String, Object> json) {
if (obj.getMaxWorkerExecuteTimeUnit() != null) {
json.put("maxWorkerExecuteTimeUnit", obj.getMaxWorkerExecuteTimeUnit().name());
}
json.put("reuseCurrentEventLoop", obj.getReuseCurrentEventLoop());
}
}
23 changes: 23 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DeploymentOptions {
public static final boolean DEFAULT_WORKER = false;
public static final boolean DEFAULT_HA = false;
public static final int DEFAULT_INSTANCES = 1;
public static final boolean DEFAULT_REUSE_CURRENT_EVENT_LOOP = false;

private JsonObject config;
private ThreadingModel threadingModel;
Expand All @@ -42,6 +43,7 @@ public class DeploymentOptions {
private int workerPoolSize;
private long maxWorkerExecuteTime;
private TimeUnit maxWorkerExecuteTimeUnit;
private boolean reuseCurrentEventLoop;

/**
* Default constructor
Expand All @@ -54,6 +56,7 @@ public DeploymentOptions() {
this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE;
this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME;
this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT;
this.reuseCurrentEventLoop = DEFAULT_REUSE_CURRENT_EVENT_LOOP;
}

/**
Expand Down Expand Up @@ -292,6 +295,26 @@ public DeploymentOptions setClassLoader(ClassLoader classLoader) {
return this;
}

/**
* @return true if the verticle should reuse the current event loop
*/
public boolean getReuseCurrentEventLoop() {
return reuseCurrentEventLoop;
}

/**
* Set whether the verticle should reuse the current event loop
* <p> when reuseCurrentEventLoop is set to true, the verticle will be deployed on the current event loop.
* If no Context is set for the current thread, a Context will be created for the current thread.
*
* @param reuseCurrentEventLoop true if the verticle should reuse the current event loop
* @return a reference to this, so the API can be used fluently
*/
public DeploymentOptions setReuseCurrentEventLoop(boolean reuseCurrentEventLoop) {
this.reuseCurrentEventLoop = reuseCurrentEventLoop;
return this;
}

/**
* Convert this to JSON
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public Future<?> deploy(DeploymentContext deployment) {
}
break;
default:
context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl);
context = options.getReuseCurrentEventLoop()
? vertx.createEventLoopContext(vertx.getOrCreateContext().nettyEventLoop(), deployment,closeFuture,workerPool, tccl)
: vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl);
break;
}
Instance instance = new Instance(verticle, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ default ContextInternal createEventLoopContext(DeploymentContext deployment, Clo
return createContext(ThreadingModel.EVENT_LOOP, deployment, closeFuture, workerPool, tccl);
}

/**
* @return event loop context
*/
default ContextInternal createEventLoopContext(EventLoop eventLoop, DeploymentContext deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
return createContext(ThreadingModel.EVENT_LOOP, eventLoop, closeFuture, workerPool, deployment, tccl);
}

/**
* @return event loop context
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.vertx.tests.deployment;

import io.netty.channel.EventLoop;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.internal.ContextInternal;
import org.junit.Test;

/**
* @author <a href="https://dreamlike-ocean.github.io/blog/">dremalike</a>
*/
public class CurrentEventLoopDeploymentTest extends AbstractVerticleTest {

@Test
public void testDeploy() throws InterruptedException {
waitFor(1);
ContextInternal currentContext = (ContextInternal) vertx.getOrCreateContext();
EventLoop targetEventLoop = currentContext.nettyEventLoop();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
EventLoop currentEventLoop = ((ContextInternal) context).nettyEventLoop();
assertEquals(targetEventLoop, currentEventLoop);
assertNotSame(currentContext, context);
}

}, new DeploymentOptions().setReuseCurrentEventLoop(true))
.onSuccess(this::assertNotNull)
.onFailure(this::fail)
.onComplete(s -> testComplete());

await();
}

@Test
public void testExecuteBlocking() {
waitFor(1);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Thread eventLoopThread = Thread.currentThread();
vertx.executeBlocking(() -> {
assertNotSame(eventLoopThread, Thread.currentThread());
startPromise.complete();
return null;
});
}
}, new DeploymentOptions().setReuseCurrentEventLoop(true))
.onSuccess(this::assertNotNull)
.onFailure(this::fail)
.onComplete(s -> testComplete());
await();
}


}