-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce VirtualThreadExecutorService for Virtual Threads Support #2185
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package org.glassfish.grizzly.threadpool; | ||
|
||
import org.glassfish.grizzly.Grizzly; | ||
import org.glassfish.grizzly.localization.LogMessages; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.AbstractExecutorService; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* @author mazhen | ||
*/ | ||
public class VirtualThreadExecutorService extends AbstractExecutorService implements Thread.UncaughtExceptionHandler { | ||
|
||
private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class); | ||
|
||
private final ExecutorService internalExecutorService; | ||
private Semaphore poolSemaphore; | ||
|
||
public static VirtualThreadExecutorService createInstance() { | ||
return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-")); | ||
} | ||
|
||
public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) { | ||
Objects.requireNonNull(cfg); | ||
return new VirtualThreadExecutorService(cfg); | ||
} | ||
|
||
protected VirtualThreadExecutorService(ThreadPoolConfig cfg) { | ||
internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg)); | ||
if (cfg.getMaxPoolSize() > 0) { | ||
poolSemaphore = new Semaphore(cfg.getMaxPoolSize()); | ||
} else { | ||
poolSemaphore = new Semaphore(Integer.MAX_VALUE); | ||
} | ||
} | ||
|
||
private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) { | ||
|
||
var prefix = threadPoolConfig.getPoolName() + "-"; | ||
|
||
// virtual threads factory | ||
final ThreadFactory factory = Thread.ofVirtual() | ||
.name(prefix, 0L) | ||
.uncaughtExceptionHandler(this) | ||
.factory(); | ||
|
||
return r -> { | ||
Thread thread = factory.newThread(r); | ||
final ClassLoader initial = threadPoolConfig.getInitialClassLoader(); | ||
if (initial != null) { | ||
thread.setContextClassLoader(initial); | ||
} | ||
return thread; | ||
}; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
internalExecutorService.shutdown(); | ||
} | ||
|
||
@Override | ||
public List<Runnable> shutdownNow() { | ||
return internalExecutorService.shutdownNow(); | ||
} | ||
|
||
@Override | ||
public boolean isShutdown() { | ||
return internalExecutorService.isShutdown(); | ||
} | ||
|
||
@Override | ||
public boolean isTerminated() { | ||
return internalExecutorService.isTerminated(); | ||
} | ||
|
||
@Override | ||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { | ||
return internalExecutorService.awaitTermination(timeout, unit); | ||
} | ||
|
||
@Override | ||
public void execute(Runnable command) { | ||
if (poolSemaphore.tryAcquire()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would also introduce a queue for the tasks that can't be immediately executed. It could be implemented by another semaphore wrapping the poolSemaphore, with the
The queue semaphore should be acquired on the kernel thread but the pool semaphore would be acquired on the virtual thread. Virtual threads might be blocked on acquiring the pool semaphore but kernel thread should continue working. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following your latest advice, I've added a second semaphore ( Your guidance has been instrumental in evolving the functionality and robustness of this service. I'm looking forward to any additional thoughts or feedback you might have! |
||
internalExecutorService.execute(() -> { | ||
try { | ||
command.run(); | ||
} finally { | ||
poolSemaphore.release(); | ||
} | ||
}); | ||
} else { | ||
throw new RejectedExecutionException("Too Many Concurrent Requests"); | ||
} | ||
} | ||
|
||
@Override | ||
public void uncaughtException(Thread thread, Throwable throwable) { | ||
logger.log(Level.WARNING, LogMessages.WARNING_GRIZZLY_THREADPOOL_UNCAUGHT_EXCEPTION(thread), throwable); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package org.glassfish.grizzly; | ||
|
||
import org.glassfish.grizzly.threadpool.ThreadPoolConfig; | ||
import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService; | ||
import org.junit.Assert; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.CyclicBarrier; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase { | ||
|
||
public void testCreateInstance() throws Exception { | ||
|
||
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); | ||
final int tasks = 2000000; | ||
doTest(r, tasks); | ||
} | ||
|
||
public void testAwaitTermination() throws Exception { | ||
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); | ||
final int tasks = 2000; | ||
doTest(r, tasks); | ||
r.shutdown(); | ||
assertTrue(r.awaitTermination(10, TimeUnit.SECONDS)); | ||
assertTrue(r.isTerminated()); | ||
} | ||
|
||
public void testQueueLimit() throws Exception { | ||
int poolSize = 10; | ||
ThreadPoolConfig config = ThreadPoolConfig.defaultConfig().setMaxPoolSize(poolSize); | ||
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config); | ||
|
||
CyclicBarrier start = new CyclicBarrier(poolSize + 1); | ||
CyclicBarrier hold = new CyclicBarrier(poolSize + 1); | ||
AtomicInteger result = new AtomicInteger(); | ||
for (int i = 0; i < poolSize; i++) { | ||
int taskId = i; | ||
r.execute(() -> { | ||
try { | ||
System.out.println("task " + taskId + " is running"); | ||
start.await(); | ||
hold.await(); | ||
result.getAndIncrement(); | ||
} catch (Exception e) { | ||
} | ||
}); | ||
} | ||
start.await(); | ||
// Too Many Concurrent Requests | ||
Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed"))); | ||
hold.await(); | ||
while (true) { | ||
if (result.intValue() == poolSize) { | ||
System.out.println("All tasks have been completed."); | ||
break; | ||
} | ||
} | ||
// The executor can accept new tasks | ||
doTest(r, poolSize); | ||
} | ||
|
||
private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception { | ||
final CountDownLatch cl = new CountDownLatch(tasks); | ||
while (tasks-- > 0) { | ||
r.execute(() -> cl.countDown()); | ||
} | ||
assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Semaphore should be constructed with the
true
value for the second parameter to enable fairness - guarantee the order of tasks.