Skip to content

Commit

Permalink
Fiber refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
bgprudhomme committed Oct 11, 2024
1 parent 54e722c commit 039af11
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void shutdown() {
@MethodSource("getComparatorName")
public void testParallelIntSearch(ComparatorType comparatorType, int threadCount)
throws ExecutionException, InterruptedException {
threadPool = Executors.newFixedThreadPool(threadCount);
threadPool = Executors.newVirtualThreadPerTaskExecutor();
try (IntVector targetVector = new IntVector("targetVector", allocator);
IntVector keyVector = new IntVector("keyVector", allocator)) {
targetVector.allocateNew(VECTOR_LENGTH);
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testParallelIntSearch(ComparatorType comparatorType, int threadCount
@MethodSource("getComparatorName")
public void testParallelStringSearch(ComparatorType comparatorType, int threadCount)
throws ExecutionException, InterruptedException {
threadPool = Executors.newFixedThreadPool(threadCount);
threadPool = Executors.newVirtualThreadPerTaskExecutor();
try (VarCharVector targetVector = new VarCharVector("targetVector", allocator);
VarCharVector keyVector = new VarCharVector("keyVector", allocator)) {
targetVector.allocateNew(VECTOR_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.arrow.flight;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.netty.GrpcSslContexts;
Expand Down Expand Up @@ -312,11 +311,8 @@ public FlightServer build() {
grpcExecutor = null;
} else {
exec =
Executors.newCachedThreadPool(
// Name threads for better debuggability
new ThreadFactoryBuilder()
.setNameFormat("flight-server-default-executor-%d")
.build());
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("flight-server-default-executor-", 0).factory());
grpcExecutor = exec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void setup() throws IOException {
allocator = new RootAllocator(Integer.MAX_VALUE);
final NoOpFlightProducer producer = new NoOpFlightProducer();
final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP;
final ExecutorService exec = Executors.newCachedThreadPool();
final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
final BindableService flightBindingService =
FlightGrpcUtils.createFlightService(allocator, producer, authHandler, exec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void main(String[] args) throws Exception {
public void throughput() throws Exception {
final int numRuns = 10;
ListeningExecutorService pool =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor());
double[] throughPuts = new double[numRuns];

for (int i = 0; i < numRuns; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.arrow.flight.integration.tests;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,10 +102,8 @@ void testScenario(String scenarioName) throws Exception {
TestBufferAllocationListener listener = new TestBufferAllocationListener();
try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) {
final ExecutorService exec =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("integration-test-flight-server-executor-%d")
.build());
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("integration-test-flight-server-executor-", 0).factory());
final FlightServer.Builder builder =
FlightServer.builder()
.executor(exec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ ArrowFlightSqlClientHandler getClientHandler() {
synchronized ExecutorService getExecutorService() {
return executorService =
executorService == null
// Refactoring this would require defining a new custom thread factory class
? Executors.newFixedThreadPool(
config.threadPoolSize(), new DefaultThreadFactory(getClass().getSimpleName()))
: executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
public static final String DB_NAME = "derbyDB";
private final String databaseUri;
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
private final Location location;
protected final PoolingDataSource<PoolableConnection> dataSource;
protected final BufferAllocator rootAllocator = new RootAllocator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void testMakeProjectorParallel(ConfigurationBuilder.ConfigOptions config
// build projectors in parallel choosing schema at random
// this should hit the same cache entry thus exposing
// any threading issues.
ExecutorService executors = Executors.newFixedThreadPool(16);
ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor();

IntStream.range(0, 1000)
.forEach(
Expand Down Expand Up @@ -348,7 +348,7 @@ public void testDivZeroParallel() throws GandivaException, InterruptedException
ExpressionTree expr = TreeBuilder.makeExpression("divide", args, c);
List<ExpressionTree> exprs = Lists.newArrayList(expr);

ExecutorService executors = Executors.newFixedThreadPool(16);
ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor();

AtomicInteger errorCount = new AtomicInteger(0);
AtomicInteger errorCountExp = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@ public void multiThread() throws InterruptedException {

for (int i = 0; i < numberOfThreads; i++) {
Thread t =
new Thread() {

@Override
public void run() {
try {
for (int i = 0; i < loops; i++) {
ensureAccurateReservations(parent);
}
} catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
}
};
Thread.ofVirtual()
.unstarted(
() -> {
try {
for (int j = 0; j < loops; j++) {
ensureAccurateReservations(parent);
}
} catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
});
threads[i] = t;
t.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void prepare() {
targetVector.allocateNew(VECTOR_LENGTH);
keyVector = new IntVector("key vector", allocator);
keyVector.allocateNew(1);
threadPool = Executors.newFixedThreadPool(numThreads);
threadPool = Executors.newVirtualThreadPerTaskExecutor();

for (int i = 0; i < VECTOR_LENGTH; i++) {
targetVector.set(i, i);
Expand Down

0 comments on commit 039af11

Please sign in to comment.