Skip to content

Commit

Permalink
Added more tests and code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Oct 11, 2024
1 parent a50cfdd commit 1f711f5
Show file tree
Hide file tree
Showing 14 changed files with 798 additions and 27 deletions.
4 changes: 0 additions & 4 deletions libs/arrow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ dependencies {
implementation "org.yaml:snakeyaml:${versions.snakeyaml}"
implementation "io.projectreactor.tools:blockhound:1.0.9.RELEASE"

// implementation 'net.sf.jopt-simple:jopt-simple:5.0.4'
// implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
// implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ interface Task {
*/
void run(VectorSchemaRoot root, FlushSignal flushSignal);

/**
* Called when the task is canceled.
* This method is used to clean up resources or cancel ongoing operations.
* This maybe called from a different thread than the one used for run(). It might be possible that run()
* thread is busy when onCancel() is called and wakes up later. In such cases, ensure that run() terminates early
* and should clean up resources.
*/
void onCancel();
}

/**
Expand All @@ -54,7 +62,10 @@ interface Task {
interface FlushSignal {
/**
* Waits for the consumption of the current data to complete.
* This method blocks until the consumption is complete or a timeout occurs.
*
* @param timeout The maximum time to wait for consumption (in milliseconds).
*/
void awaitConsumption();
void awaitConsumption(int timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.flight;

import org.apache.arrow.flight.BackpressureStrategy;

public class BaseBackpressureStrategy extends BackpressureStrategy.CallbackBackpressureStrategy {
private final Runnable readyCallback;
private final Runnable cancelCallback;

BaseBackpressureStrategy(Runnable readyCallback, Runnable cancelCallback) {
this.readyCallback = readyCallback;
this.cancelCallback = cancelCallback;
}

/** Callback to execute when the listener is ready. */
protected void readyCallback() {
if (readyCallback != null) {
readyCallback.run();
}
}

/** Callback to execute when the listener is cancelled. */
protected void cancelCallback() {
if (cancelCallback != null) {
cancelCallback.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,39 @@
import org.opensearch.arrow.StreamManager;
import org.opensearch.arrow.StreamTicket;

import java.util.function.Supplier;

/**
* BaseFlightProducer extends NoOpFlightProducer to provide stream management functionality
* for Arrow Flight in OpenSearch. This class handles the retrieval and streaming of data
* based on provided tickets, managing backpressure, and coordinating between the stream
* provider and the server stream listener.
*/
public class BaseFlightProducer extends NoOpFlightProducer {
private final StreamManager streamManager;
private final BufferAllocator allocator;

/**
* Constructs a new BaseFlightProducer.
*
* @param streamManager The StreamManager to handle stream operations, including
* retrieving and removing streams based on tickets.
* @param allocator The BufferAllocator for memory management in Arrow operations.
*/
public BaseFlightProducer(StreamManager streamManager, BufferAllocator allocator) {
this.streamManager = streamManager;
this.allocator = allocator;
}

/**
* Handles the retrieval and streaming of data based on the provided ticket.
* This method orchestrates the entire process of setting up the stream,
* managing backpressure, and handling data flow to the client.
*
* @param context The call context (unused in this implementation)
* @param ticket The ticket containing stream information
* @param listener The server stream listener to handle the data flow
*/
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
StreamTicket streamTicket = new StreamTicket(ticket.getBytes()) {};
Expand All @@ -37,30 +60,37 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
listener.error(CallStatus.NOT_FOUND.withDescription("Stream not found").toRuntimeException());
return;
}
BackpressureStrategy backpressureStrategy = new BackpressureStrategy.CallbackBackpressureStrategy();
backpressureStrategy.register(listener);
ArrowStreamProvider.Task task = provider.create(allocator);
VectorSchemaRoot root = task.init(allocator);
listener.start(root);
ArrowStreamProvider.FlushSignal flushSignal = () -> {
BackpressureStrategy.WaitResult result = backpressureStrategy.waitForListener(1000);
if (context.isCancelled()) {
task.onCancel();
listener.error(CallStatus.CANCELLED.cause());
return;
}
listener.setOnCancelHandler(task::onCancel);
BackpressureStrategy backpressureStrategy = new BaseBackpressureStrategy(null, task::onCancel);
backpressureStrategy.register(listener);
ArrowStreamProvider.FlushSignal flushSignal = (timeout) -> {
BackpressureStrategy.WaitResult result = backpressureStrategy.waitForListener(timeout);
if (result.equals(BackpressureStrategy.WaitResult.READY)) {
listener.putNext();
} else if (result.equals(BackpressureStrategy.WaitResult.TIMEOUT)) {
listener.error(CallStatus.TIMED_OUT.cause());
throw new RuntimeException("Timeout waiting for listener" + result);
throw new RuntimeException("Stream deadline exceeded for consumption");
} else if (result.equals(BackpressureStrategy.WaitResult.CANCELLED)) {
task.onCancel();
listener.error(CallStatus.CANCELLED.cause());
throw new RuntimeException("Stream cancelled by client");
} else {
listener.error(CallStatus.INTERNAL.toRuntimeException());
throw new RuntimeException("Error while waiting for client: " + result);
}
};
try {
try(VectorSchemaRoot root = task.init(allocator)) {
listener.start(root);
task.run(root, flushSignal);
} finally {
root.close();
}
} catch (Exception e) {
listener.error(CallStatus.INTERNAL.toRuntimeException().initCause(e));
listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).withCause(e).cause());
throw e;
} finally {
listener.completed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import org.opensearch.common.settings.Settings;

import java.io.IOException;

/**
* FlightService manages the Arrow Flight server and client for OpenSearch.
* It handles the initialization, startup, and shutdown of the Flight server and client,
* as well as managing the stream operations through a FlightStreamManager.
*/
@ExperimentalApi
public class FlightService extends AbstractLifecycleComponent {

Expand Down Expand Up @@ -75,7 +79,7 @@ public class FlightService extends AbstractLifecycleComponent {

public static final Setting<Integer> NETTY_ALLOCATOR_NUM_DIRECT_ARENAS = Setting.intSetting(
"io.netty.allocator.numDirectArenas",
1,
1, // TODO - 2 * the number of available processors
1,
Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,35 @@

import java.util.UUID;

/**
* FlightStreamManager is a concrete implementation of StreamManager that provides
* an abstraction layer for managing Arrow Flight streams in OpenSearch.
* It encapsulates the details of Flight client operations, allowing consumers to
* work with streams without direct exposure to Flight internals.
*/
public class FlightStreamManager extends StreamManager {

private final FlightClient flightClient;

/**
* Constructs a new FlightStreamManager.
*
* @param flightClient The FlightClient instance used for stream operations.
*/
public FlightStreamManager(FlightClient flightClient) {
super();
this.flightClient = flightClient;
}

/**
* Retrieves a VectorSchemaRoot for a given stream ticket.
* @param ticket The StreamTicket identifying the desired stream.
* @return The VectorSchemaRoot associated with the given ticket.
*/
@Override
public VectorSchemaRoot getVectorSchemaRoot(StreamTicket ticket) {
// TODO: for remote streams
// TODO: for remote streams, register streams in cluster state with node details
// maintain flightClient for all nodes in the cluster to serve the stream
FlightStream stream = flightClient.getStream(new Ticket(ticket.getBytes()));
return stream.getRoot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.flight.FlightService.*;
import static org.opensearch.flight.FlightService.ARROW_ALLOCATION_MANAGER_TYPE;
import static org.opensearch.flight.FlightService.ARROW_ENABLE_NULL_CHECK_FOR_GET;
import static org.opensearch.flight.FlightService.ARROW_ENABLE_UNSAFE_MEMORY_ACCESS;
import static org.opensearch.flight.FlightService.NETTY_ALLOCATOR_NUM_DIRECT_ARENAS;
import static org.opensearch.flight.FlightService.NETTY_NO_UNSAFE;
import static org.opensearch.flight.FlightService.NETTY_TRY_REFLECTION_SET_ACCESSIBLE;
import static org.opensearch.flight.FlightService.NETTY_TRY_UNSAFE;


public class FlightStreamPlugin extends Plugin implements StreamManagerPlugin {

private FlightService flightService;
private final FlightService flightService;

public FlightStreamPlugin(Settings settings) {
this.flightService = new FlightService(settings);
Expand Down Expand Up @@ -68,10 +75,10 @@ public List<Setting<?>> getSettings() {
ARROW_ALLOCATION_MANAGER_TYPE,
ARROW_ENABLE_NULL_CHECK_FOR_GET,
NETTY_TRY_REFLECTION_SET_ACCESSIBLE,
FlightService.ARROW_ENABLE_UNSAFE_MEMORY_ACCESS,
FlightService.NETTY_ALLOCATOR_NUM_DIRECT_ARENAS,
FlightService.NETTY_NO_UNSAFE,
FlightService.NETTY_TRY_UNSAFE
ARROW_ENABLE_UNSAFE_MEMORY_ACCESS,
NETTY_ALLOCATOR_NUM_DIRECT_ARENAS,
NETTY_NO_UNSAFE,
NETTY_TRY_UNSAFE
);
}
}
Expand Down
Loading

0 comments on commit 1f711f5

Please sign in to comment.