From 5053c26073ea20e2c9b353548e7a53cc0d6a3f5f Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Mon, 18 Nov 2024 12:20:04 -0800 Subject: [PATCH] fix allocator npe --- .../main/java/org/opensearch/flight/FlightService.java | 2 +- .../java/org/opensearch/flight/FlightStreamManager.java | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightService.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightService.java index 6767a5179445c..f313b64647f86 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightService.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightService.java @@ -174,7 +174,7 @@ public void initialize(ClusterService clusterService, ThreadPool threadPool) { this.threadPool = threadPool; this.clusterService.trySet(clusterService); clusterService.addListener(this); - streamManager = new FlightStreamManager(allocator, this); + streamManager = new FlightStreamManager(() -> allocator, this); } public void setSecureTransportSettingsProvider(SecureTransportSettingsProvider secureTransportSettingsProvider) { diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightStreamManager.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightStreamManager.java index 7fbb35a3fadfc..048e8461e6d4c 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightStreamManager.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/flight/FlightStreamManager.java @@ -23,6 +23,7 @@ import org.opensearch.core.tasks.TaskId; import java.util.UUID; +import java.util.function.Supplier; /** * FlightStreamManager is a concrete implementation of StreamManager that provides @@ -33,7 +34,7 @@ public class FlightStreamManager implements StreamManager { private final FlightService flightService; - private final BufferAllocator allocator; + private final Supplier allocatorSupplier; private final Cache streamProducers; private static final TimeValue expireAfter = TimeValue.timeValueMinutes(2); private static final long MAX_PRODUCERS = 10000; @@ -42,8 +43,8 @@ public class FlightStreamManager implements StreamManager { * Constructs a new FlightStreamManager. * @param flightService The FlightService instance to use for Flight client operations. */ - public FlightStreamManager(BufferAllocator allocator, FlightService flightService) { - this.allocator = allocator; + public FlightStreamManager(Supplier allocatorSupplier, FlightService flightService) { + this.allocatorSupplier = allocatorSupplier; this.flightService = flightService; this.streamProducers = CacheBuilder.builder() .setExpireAfterWrite(expireAfter) @@ -54,7 +55,7 @@ public FlightStreamManager(BufferAllocator allocator, FlightService flightServic @Override public StreamTicket registerStream(StreamProducer provider, TaskId parentTaskId) { String ticket = generateUniqueTicket(); - streamProducers.put(ticket, new StreamProducerHolder(provider, allocator)); + streamProducers.put(ticket, new StreamProducerHolder(provider, allocatorSupplier.get())); return new StreamTicket(ticket, getLocalNodeId()); }