From 78c6fa71bd52a273f123b9cf6c752a1bfb696817 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 3 Oct 2024 10:46:21 -0700 Subject: [PATCH] more error handling --- .../main/java/org/opensearch/flight/BaseFlightProducer.java | 6 +++--- .../java/org/opensearch/flight/FlightStreamManager.java | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/arrow-flight/src/main/java/org/opensearch/flight/BaseFlightProducer.java b/modules/arrow-flight/src/main/java/org/opensearch/flight/BaseFlightProducer.java index cfc67d2e1325f..fb9320ce832b8 100644 --- a/modules/arrow-flight/src/main/java/org/opensearch/flight/BaseFlightProducer.java +++ b/modules/arrow-flight/src/main/java/org/opensearch/flight/BaseFlightProducer.java @@ -47,10 +47,10 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l if (result.equals(BackpressureStrategy.WaitResult.READY)) { listener.putNext(); } else if (result.equals(BackpressureStrategy.WaitResult.TIMEOUT)) { - listener.error(new RuntimeException("Timeout waiting for listener")); + listener.error(CallStatus.TIMED_OUT.cause()); throw new RuntimeException("Timeout waiting for listener" + result); } else { - listener.error(new RuntimeException("Error while waiting for client: " + result)); + listener.error(CallStatus.INTERNAL.toRuntimeException()); throw new RuntimeException("Error while waiting for client: " + result); } }; @@ -60,7 +60,7 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l root.close(); } } catch (Exception e) { - listener.error(e); + listener.error(CallStatus.INTERNAL.toRuntimeException().initCause(e)); throw e; } finally { listener.completed(); diff --git a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java index f7e3c8d570966..4c61e5ad5e8f3 100644 --- a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java +++ b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java @@ -28,6 +28,7 @@ public FlightStreamManager(FlightClient flightClient) { @Override public VectorSchemaRoot getVectorSchemaRoot(StreamTicket ticket) { + // TODO: for remote streams FlightStream stream = flightClient.getStream(new Ticket(ticket.getBytes())); return stream.getRoot(); }