Skip to content

Commit

Permalink
fix allocator npe
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Nov 18, 2024
1 parent f76d9c5 commit 5053c26
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void initialize(ClusterService clusterService, ThreadPool threadPool) {
this.threadPool = threadPool;
this.clusterService.trySet(clusterService);
clusterService.addListener(this);
streamManager = new FlightStreamManager(allocator, this);
streamManager = new FlightStreamManager(() -> allocator, this);
}

public void setSecureTransportSettingsProvider(SecureTransportSettingsProvider secureTransportSettingsProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.core.tasks.TaskId;

import java.util.UUID;
import java.util.function.Supplier;

/**
* FlightStreamManager is a concrete implementation of StreamManager that provides
Expand All @@ -33,7 +34,7 @@
public class FlightStreamManager implements StreamManager {

private final FlightService flightService;
private final BufferAllocator allocator;
private final Supplier<BufferAllocator> allocatorSupplier;
private final Cache<String, StreamProducerHolder> streamProducers;
private static final TimeValue expireAfter = TimeValue.timeValueMinutes(2);
private static final long MAX_PRODUCERS = 10000;
Expand All @@ -42,8 +43,8 @@ public class FlightStreamManager implements StreamManager {
* Constructs a new FlightStreamManager.
* @param flightService The FlightService instance to use for Flight client operations.
*/
public FlightStreamManager(BufferAllocator allocator, FlightService flightService) {
this.allocator = allocator;
public FlightStreamManager(Supplier<BufferAllocator> allocatorSupplier, FlightService flightService) {
this.allocatorSupplier = allocatorSupplier;
this.flightService = flightService;
this.streamProducers = CacheBuilder.<String, StreamProducerHolder>builder()
.setExpireAfterWrite(expireAfter)
Expand All @@ -54,7 +55,7 @@ public FlightStreamManager(BufferAllocator allocator, FlightService flightServic
@Override
public StreamTicket registerStream(StreamProducer provider, TaskId parentTaskId) {
String ticket = generateUniqueTicket();
streamProducers.put(ticket, new StreamProducerHolder(provider, allocator));
streamProducers.put(ticket, new StreamProducerHolder(provider, allocatorSupplier.get()));
return new StreamTicket(ticket, getLocalNodeId());
}

Expand Down

0 comments on commit 5053c26

Please sign in to comment.