From ea8aa6a18b2d91a8706b929c889dd02e89c17e81 Mon Sep 17 00:00:00 2001 From: mazhen Date: Tue, 8 Aug 2023 16:55:53 +0800 Subject: [PATCH 1/3] Add VirtualThreadExecutorService for virtual threads --- .github/workflows/maven.yml | 13 +-- .../VirtualThreadExecutorService.java | 95 +++++++++++++++++++ .../VirtualThreadExecutorServiceTest.java | 52 ++++++++++ pom.xml | 8 +- 4 files changed, 159 insertions(+), 9 deletions(-) create mode 100644 modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java create mode 100644 modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f6e984d06..7e1265efb 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -14,6 +14,7 @@ name: Eclipse Grizzly NIO CI on: pull_request: + push: jobs: build: @@ -22,15 +23,15 @@ jobs: strategy: matrix: - java_version: [ 11 ] + java_version: [ 21-ea ] steps: - name: Checkout for build uses: actions/checkout@v3 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: - distribution: 'temurin' + distribution: 'zulu' java-version: ${{ matrix.java_version }} - name: Maven Build run: | @@ -47,15 +48,15 @@ jobs: strategy: matrix: - java_version: [ 11 ] + java_version: [ 21-ea ] steps: - name: Checkout for build uses: actions/checkout@v3 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: - distribution: 'temurin' + distribution: 'zulu' java-version: ${{ matrix.java_version }} - name: Maven Build run: | diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java new file mode 100644 index 000000000..c908fa54f --- /dev/null +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java @@ -0,0 +1,95 @@ +package org.glassfish.grizzly.threadpool; + +import org.glassfish.grizzly.Grizzly; +import org.glassfish.grizzly.localization.LogMessages; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author mazhen + */ +public class VirtualThreadExecutorService extends AbstractExecutorService implements Thread.UncaughtExceptionHandler { + + private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class); + + private final ExecutorService internalExecutorService; + + public static VirtualThreadExecutorService createInstance() { + return createInstance(ThreadPoolConfig.defaultConfig().setPoolName("Grizzly-virt-")); + } + + public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) { + Objects.requireNonNull(cfg); + return new VirtualThreadExecutorService(cfg); + } + + protected VirtualThreadExecutorService(ThreadPoolConfig cfg) { + internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg)); + } + + private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) { + + var prefix = threadPoolConfig.getPoolName() + "-"; + + // virtual threads factory + final ThreadFactory factory = Thread.ofVirtual() + .name(prefix, 0L) + .uncaughtExceptionHandler(this) + .factory(); + + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = factory.newThread(r); + final ClassLoader initial = threadPoolConfig.getInitialClassLoader(); + if (initial != null) { + thread.setContextClassLoader(initial); + } + return thread; + } + }; + } + + @Override + public void shutdown() { + internalExecutorService.shutdown(); + } + + @Override + public List shutdownNow() { + return internalExecutorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return internalExecutorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return internalExecutorService.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return internalExecutorService.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + internalExecutorService.execute(command); + } + + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + logger.log(Level.WARNING, LogMessages.WARNING_GRIZZLY_THREADPOOL_UNCAUGHT_EXCEPTION(thread), throwable); + } +} diff --git a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java new file mode 100644 index 000000000..d56832717 --- /dev/null +++ b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java @@ -0,0 +1,52 @@ +package org.glassfish.grizzly; + +import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase { + + public void testCreateInstance() throws Exception { + + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); + final int tasks = 2000000; + doTest(r, tasks); + } + + public void testAwaitTermination() throws Exception { + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); + final int tasks = 2000; + runTasks(r, tasks); + r.shutdown(); + assertTrue(r.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(r.isTerminated()); + } + + private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception { + final CountDownLatch cl = new CountDownLatch(tasks); + while (tasks-- > 0) { + r.execute(new Runnable() { + @Override + public void run() { + cl.countDown(); + } + }); + } + assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS)); + } + + private void runTasks(VirtualThreadExecutorService r, int tasks) throws Exception { + while (tasks-- > 0) { + r.execute(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(50); + } catch (Exception ignore) { + } + } + }); + } + } +} diff --git a/pom.xml b/pom.xml index 979dfd721..aebef09df 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,7 @@ + 21 UTF-8 6.0.0 1.0.0 @@ -196,8 +197,9 @@ maven-compiler-plugin 3.9.0 - 11 - 11 + ${java.version} + ${java.version} + ${java.version} -Xlint:unchecked,deprecation,fallthrough,finally,cast,dep-ann,empty,overrides @@ -224,7 +226,7 @@ - [11,) + [21,) 3.6.3 From 06a32b9db402c9a202acfee7bd4d3ec5f42129f8 Mon Sep 17 00:00:00 2001 From: mazhen Date: Tue, 31 Oct 2023 16:14:26 +0800 Subject: [PATCH 2/3] Added Semaphore to limit concurrent task execution --- .github/workflows/maven.yml | 8 +-- .../VirtualThreadExecutorService.java | 39 +++++++++---- .../VirtualThreadExecutorServiceTest.java | 58 +++++++++++++------ 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 7e1265efb..f326e0229 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - java_version: [ 21-ea ] + java_version: [ 21 ] steps: - name: Checkout for build @@ -31,7 +31,7 @@ jobs: - name: Set up JDK uses: actions/setup-java@v3 with: - distribution: 'zulu' + distribution: 'temurin' java-version: ${{ matrix.java_version }} - name: Maven Build run: | @@ -48,7 +48,7 @@ jobs: strategy: matrix: - java_version: [ 21-ea ] + java_version: [ 21 ] steps: - name: Checkout for build @@ -56,7 +56,7 @@ jobs: - name: Set up JDK uses: actions/setup-java@v3 with: - distribution: 'zulu' + distribution: 'temurin' java-version: ${{ matrix.java_version }} - name: Maven Build run: | diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java index c908fa54f..a751531cd 100644 --- a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java @@ -8,6 +8,8 @@ import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -21,9 +23,10 @@ public class VirtualThreadExecutorService extends AbstractExecutorService implem private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class); private final ExecutorService internalExecutorService; + private Semaphore poolSemaphore; public static VirtualThreadExecutorService createInstance() { - return createInstance(ThreadPoolConfig.defaultConfig().setPoolName("Grizzly-virt-")); + return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-")); } public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) { @@ -33,28 +36,30 @@ public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) protected VirtualThreadExecutorService(ThreadPoolConfig cfg) { internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg)); + if (cfg.getMaxPoolSize() > 0) { + poolSemaphore = new Semaphore(cfg.getMaxPoolSize()); + } else { + poolSemaphore = new Semaphore(Integer.MAX_VALUE); + } } private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) { var prefix = threadPoolConfig.getPoolName() + "-"; - + // virtual threads factory final ThreadFactory factory = Thread.ofVirtual() .name(prefix, 0L) .uncaughtExceptionHandler(this) .factory(); - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = factory.newThread(r); - final ClassLoader initial = threadPoolConfig.getInitialClassLoader(); - if (initial != null) { - thread.setContextClassLoader(initial); - } - return thread; + return r -> { + Thread thread = factory.newThread(r); + final ClassLoader initial = threadPoolConfig.getInitialClassLoader(); + if (initial != null) { + thread.setContextClassLoader(initial); } + return thread; }; } @@ -85,7 +90,17 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE @Override public void execute(Runnable command) { - internalExecutorService.execute(command); + if (poolSemaphore.tryAcquire()) { + internalExecutorService.execute(() -> { + try { + command.run(); + } finally { + poolSemaphore.release(); + } + }); + } else { + throw new RejectedExecutionException("Too Many Concurrent Requests"); + } } @Override diff --git a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java index d56832717..f707a0924 100644 --- a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java +++ b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java @@ -1,9 +1,14 @@ package org.glassfish.grizzly; +import org.glassfish.grizzly.threadpool.ThreadPoolConfig; import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService; +import org.junit.Assert; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase { @@ -17,36 +22,51 @@ public void testCreateInstance() throws Exception { public void testAwaitTermination() throws Exception { VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); final int tasks = 2000; - runTasks(r, tasks); + doTest(r, tasks); r.shutdown(); assertTrue(r.awaitTermination(10, TimeUnit.SECONDS)); assertTrue(r.isTerminated()); } - private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception { - final CountDownLatch cl = new CountDownLatch(tasks); - while (tasks-- > 0) { - r.execute(new Runnable() { - @Override - public void run() { - cl.countDown(); + public void testQueueLimit() throws Exception { + int poolSize = 10; + ThreadPoolConfig config = ThreadPoolConfig.defaultConfig().setMaxPoolSize(poolSize); + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config); + + CyclicBarrier start = new CyclicBarrier(poolSize + 1); + CyclicBarrier hold = new CyclicBarrier(poolSize + 1); + AtomicInteger result = new AtomicInteger(); + for (int i = 0; i < poolSize; i++) { + int taskId = i; + r.execute(() -> { + try { + System.out.println("task " + taskId + " is running"); + start.await(); + hold.await(); + result.getAndIncrement(); + } catch (Exception e) { } }); } - assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS)); + start.await(); + // Too Many Concurrent Requests + Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed"))); + hold.await(); + while (true) { + if (result.intValue() == poolSize) { + System.out.println("All tasks have been completed."); + break; + } + } + // The executor can accept new tasks + doTest(r, poolSize); } - private void runTasks(VirtualThreadExecutorService r, int tasks) throws Exception { + private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception { + final CountDownLatch cl = new CountDownLatch(tasks); while (tasks-- > 0) { - r.execute(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(50); - } catch (Exception ignore) { - } - } - }); + r.execute(() -> cl.countDown()); } + assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS)); } } From 03d81122b855fba8dd4658fab35259926feaa828 Mon Sep 17 00:00:00 2001 From: mazhen Date: Fri, 22 Dec 2023 11:12:25 +0800 Subject: [PATCH 3/3] Improve Thread Pool Management in VirtualThreadExecutorService --- .../VirtualThreadExecutorService.java | 35 +++++++++++++------ .../VirtualThreadExecutorServiceTest.java | 29 +++++++++++---- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java index a751531cd..f98b91703 100644 --- a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java @@ -23,7 +23,8 @@ public class VirtualThreadExecutorService extends AbstractExecutorService implem private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class); private final ExecutorService internalExecutorService; - private Semaphore poolSemaphore; + private final Semaphore poolSemaphore; + private final Semaphore queueSemaphore; public static VirtualThreadExecutorService createInstance() { return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-")); @@ -36,11 +37,18 @@ public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) protected VirtualThreadExecutorService(ThreadPoolConfig cfg) { internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg)); - if (cfg.getMaxPoolSize() > 0) { - poolSemaphore = new Semaphore(cfg.getMaxPoolSize()); + + int poolSizeLimit = cfg.getMaxPoolSize() > 0 ? cfg.getMaxPoolSize() : Integer.MAX_VALUE; + int queueLimit = cfg.getQueueLimit() >= 0 ? cfg.getQueueLimit() : Integer.MAX_VALUE; + // Check for integer overflow + long totalLimit = (long) poolSizeLimit + (long) queueLimit; + if (totalLimit > Integer.MAX_VALUE) { + // Handle the overflow case + queueSemaphore = new Semaphore(Integer.MAX_VALUE, true); } else { - poolSemaphore = new Semaphore(Integer.MAX_VALUE); + queueSemaphore = new Semaphore((int) totalLimit, true); } + poolSemaphore = new Semaphore(poolSizeLimit, true); } private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) { @@ -90,17 +98,24 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE @Override public void execute(Runnable command) { - if (poolSemaphore.tryAcquire()) { - internalExecutorService.execute(() -> { + if (!queueSemaphore.tryAcquire()) { + throw new RejectedExecutionException("Too Many Concurrent Requests"); + } + + internalExecutorService.execute(() -> { + try { + poolSemaphore.acquire(); try { command.run(); } finally { poolSemaphore.release(); } - }); - } else { - throw new RejectedExecutionException("Too Many Concurrent Requests"); - } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queueSemaphore.release(); + } + }); } @Override diff --git a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java index f707a0924..e45fa9c8c 100644 --- a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java +++ b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java @@ -29,14 +29,18 @@ public void testAwaitTermination() throws Exception { } public void testQueueLimit() throws Exception { - int poolSize = 10; - ThreadPoolConfig config = ThreadPoolConfig.defaultConfig().setMaxPoolSize(poolSize); + int maxPoolSize = 20; + int queueLimit = 10; + int queue = maxPoolSize + queueLimit; + ThreadPoolConfig config = ThreadPoolConfig.defaultConfig() + .setMaxPoolSize(maxPoolSize) + .setQueueLimit(queueLimit); VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config); - CyclicBarrier start = new CyclicBarrier(poolSize + 1); - CyclicBarrier hold = new CyclicBarrier(poolSize + 1); + CyclicBarrier start = new CyclicBarrier(maxPoolSize + 1); + CyclicBarrier hold = new CyclicBarrier(maxPoolSize + 1); AtomicInteger result = new AtomicInteger(); - for (int i = 0; i < poolSize; i++) { + for (int i = 0; i < maxPoolSize; i++) { int taskId = i; r.execute(() -> { try { @@ -44,22 +48,33 @@ public void testQueueLimit() throws Exception { start.await(); hold.await(); result.getAndIncrement(); + System.out.println("task " + taskId + " is completed"); } catch (Exception e) { } }); } start.await(); + for (int i = maxPoolSize; i < queue; i++) { + int taskId = i; + r.execute(() -> { + try { + result.getAndIncrement(); + System.out.println("task " + taskId + " is completed"); + } catch (Exception e) { + } + }); + } // Too Many Concurrent Requests Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed"))); hold.await(); while (true) { - if (result.intValue() == poolSize) { + if (result.intValue() == queue) { System.out.println("All tasks have been completed."); break; } } // The executor can accept new tasks - doTest(r, poolSize); + doTest(r, queue); } private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception {