Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce VirtualThreadExecutorService for Virtual Threads Support #2185

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ name: Eclipse Grizzly NIO CI

on:
pull_request:
push:

jobs:
build:
Expand All @@ -22,13 +23,13 @@ jobs:

strategy:
matrix:
java_version: [ 11 ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
Expand All @@ -47,13 +48,13 @@ jobs:

strategy:
matrix:
java_version: [ 11 ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.glassfish.grizzly.threadpool;

import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.localization.LogMessages;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @author mazhen
*/
public class VirtualThreadExecutorService extends AbstractExecutorService implements Thread.UncaughtExceptionHandler {

private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class);

private final ExecutorService internalExecutorService;
private Semaphore poolSemaphore;

public static VirtualThreadExecutorService createInstance() {
return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-"));
}

public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) {
Objects.requireNonNull(cfg);
return new VirtualThreadExecutorService(cfg);
}

protected VirtualThreadExecutorService(ThreadPoolConfig cfg) {
internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg));
if (cfg.getMaxPoolSize() > 0) {
poolSemaphore = new Semaphore(cfg.getMaxPoolSize());
} else {
poolSemaphore = new Semaphore(Integer.MAX_VALUE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Semaphore should be constructed with the true value for the second parameter to enable fairness - guarantee the order of tasks.

}
}

private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) {

var prefix = threadPoolConfig.getPoolName() + "-";

// virtual threads factory
final ThreadFactory factory = Thread.ofVirtual()
.name(prefix, 0L)
.uncaughtExceptionHandler(this)
.factory();

return r -> {
Thread thread = factory.newThread(r);
final ClassLoader initial = threadPoolConfig.getInitialClassLoader();
if (initial != null) {
thread.setContextClassLoader(initial);
}
return thread;
};
}

@Override
public void shutdown() {
internalExecutorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return internalExecutorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return internalExecutorService.isShutdown();
}

@Override
public boolean isTerminated() {
return internalExecutorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return internalExecutorService.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
if (poolSemaphore.tryAcquire()) {
Copy link

@OndroMih OndroMih Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also introduce a queue for the tasks that can't be immediately executed. It could be implemented by another semaphore wrapping the poolSemaphore, with the queueSize + maxPoolSize permits, like this:

Semaphore poolSemaphore = new Semaphore(maxPoolSize, true);
Semaphore queueSemaphore = new Semaphore(queueSize + maxPoolSize, true);

if (queueSemaphore.tryAcquire() {
   internalExecutorService.execute(() -> {
                try {
                    poolSemaphore.acquire();
                    command.run();
                } finally {
                    poolSemaphore.release();
                    queueSemaphore.release();
                }
            });
} else {
  throw new RejectedExecutionException("Too Many Concurrent Requests");
}

The queue semaphore should be acquired on the kernel thread but the pool semaphore would be acquired on the virtual thread. Virtual threads might be blocked on acquiring the pool semaphore but kernel thread should continue working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following your latest advice, I've added a second semaphore (queueSemaphore) to further refine the task management in VirtualThreadExecutorService.

Your guidance has been instrumental in evolving the functionality and robustness of this service. I'm looking forward to any additional thoughts or feedback you might have!

internalExecutorService.execute(() -> {
try {
command.run();
} finally {
poolSemaphore.release();
}
});
} else {
throw new RejectedExecutionException("Too Many Concurrent Requests");
}
}

@Override
public void uncaughtException(Thread thread, Throwable throwable) {
logger.log(Level.WARNING, LogMessages.WARNING_GRIZZLY_THREADPOOL_UNCAUGHT_EXCEPTION(thread), throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.glassfish.grizzly;

import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService;
import org.junit.Assert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase {

public void testCreateInstance() throws Exception {

VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance();
final int tasks = 2000000;
doTest(r, tasks);
}

public void testAwaitTermination() throws Exception {
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance();
final int tasks = 2000;
doTest(r, tasks);
r.shutdown();
assertTrue(r.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(r.isTerminated());
}

public void testQueueLimit() throws Exception {
int poolSize = 10;
ThreadPoolConfig config = ThreadPoolConfig.defaultConfig().setMaxPoolSize(poolSize);
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config);

CyclicBarrier start = new CyclicBarrier(poolSize + 1);
CyclicBarrier hold = new CyclicBarrier(poolSize + 1);
AtomicInteger result = new AtomicInteger();
for (int i = 0; i < poolSize; i++) {
int taskId = i;
r.execute(() -> {
try {
System.out.println("task " + taskId + " is running");
start.await();
hold.await();
result.getAndIncrement();
} catch (Exception e) {
}
});
}
start.await();
// Too Many Concurrent Requests
Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed")));
hold.await();
while (true) {
if (result.intValue() == poolSize) {
System.out.println("All tasks have been completed.");
break;
}
}
// The executor can accept new tasks
doTest(r, poolSize);
}

private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception {
final CountDownLatch cl = new CountDownLatch(tasks);
while (tasks-- > 0) {
r.execute(() -> cl.countDown());
}
assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS));
}
}
8 changes: 5 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
</issueManagement>

<properties>
<java.version>21</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<servlet-version>6.0.0</servlet-version>
<maven-plugin.version>1.0.0</maven-plugin.version>
Expand Down Expand Up @@ -196,8 +197,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.9.0</version>
<configuration>
<source>11</source>
<target>11</target>
<source>${java.version}</source>
<target>${java.version}</target>
<release>${java.version}</release>
<compilerArgument>-Xlint:unchecked,deprecation,fallthrough,finally,cast,dep-ann,empty,overrides</compilerArgument>
</configuration>
</plugin>
Expand All @@ -224,7 +226,7 @@
<configuration>
<rules>
<requireJavaVersion>
<version>[11,)</version>
<version>[21,)</version>
</requireJavaVersion>
<requireMavenVersion>
<version>3.6.3</version>
Expand Down