Skip to content

Commit

Permalink
provide flight client through stream manager
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Oct 14, 2024
1 parent ba20408 commit 130d554
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
*/
@ExperimentalApi
public abstract class StreamManager implements AutoCloseable {

public abstract void setFlightClient(Object flightClient);

private final ConcurrentHashMap<StreamTicket, ArrowStreamProvider> streams;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ protected void doStart() {
final Location location = Location.forGrpcInsecure(host, port);
server = FlightServer.builder(allocator, location, producer).build();
client = FlightClient.builder(allocator, location).build();
streamManager.setFlightClient(client);
server.start();
logger.info("Arrow Flight server started successfully");
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public class FlightStreamManager extends StreamManager {

private final FlightClient flightClient;
private FlightClient flightClient;

/**
* Constructs a new FlightStreamManager.
Expand All @@ -42,6 +42,12 @@ public FlightStreamManager(FlightClient flightClient) {
* @param ticket The StreamTicket identifying the desired stream.
* @return The VectorSchemaRoot associated with the given ticket.
*/
@Override
public void setFlightClient(Object flightClient) {
assert flightClient instanceof FlightClient;
this.flightClient = (FlightClient) flightClient;
}

@Override
public VectorSchemaRoot getVectorSchemaRoot(StreamTicket ticket) {
// TODO: for remote streams, register streams in cluster state with node details
Expand Down

0 comments on commit 130d554

Please sign in to comment.