From 9d95b02eeff4845ae0a78cb0823631e8f38f0d73 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Wed, 20 Nov 2024 11:12:39 -0800 Subject: [PATCH] add support for onCancel and Cancellable for BatchedJob in lib:arrow module --- .../java/org/opensearch/arrow/StreamProducer.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/libs/arrow/src/main/java/org/opensearch/arrow/StreamProducer.java b/libs/arrow/src/main/java/org/opensearch/arrow/StreamProducer.java index f89963150d225..e6edae323fd68 100644 --- a/libs/arrow/src/main/java/org/opensearch/arrow/StreamProducer.java +++ b/libs/arrow/src/main/java/org/opensearch/arrow/StreamProducer.java @@ -13,6 +13,8 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.tasks.TaskId; +import java.io.Closeable; + /** * Represents a producer of Arrow streams. The producer first needs to define the job by implementing this interface and * then register the job with the {@link StreamManager#registerStream(StreamProducer, TaskId)}, which will return {@link StreamTicket} @@ -75,7 +77,7 @@ * @see StreamIterator */ @ExperimentalApi -public interface StreamProducer { +public interface StreamProducer extends Closeable { /** * Creates a VectorSchemaRoot that defines the schema for this stream. This schema will be used @@ -127,13 +129,21 @@ interface BatchedJob { void run(VectorSchemaRoot root, FlushSignal flushSignal); /** - * Called when the job is canceled. + * Called to signal producer when the job is canceled. * This method is used to clean up resources or cancel ongoing operations. * This maybe called from a different thread than the one used for run(). It might be possible that run() * thread is busy when onCancel() is called and wakes up later. In such cases, ensure that run() terminates early * and should clean up resources. */ void onCancel(); + + /** + * Producers can set isCancelled flag to true to indicate that the job is canceled. + * This will ensure the stream is closed and no more data is produced from next Batch onwards. + * + * @return true if the job is canceled, false otherwise + */ + boolean isCancelled(); } /**