Skip to content

Commit

Permalink
Added tests for stateless server
Browse files Browse the repository at this point in the history
  • Loading branch information
stevelorddremio committed Apr 12, 2024
1 parent cfee739 commit 7fb894e
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.sql.example;

import java.io.Serializable;

public class DoPutPreparedStatementResultPOJO implements Serializable {
private transient String query;
private transient byte[] parameters;

public DoPutPreparedStatementResultPOJO(String query, byte[] parameters) {
this.query = query;
this.parameters = parameters.clone();
}

public String getQuery() {
return null;
}

public byte[] getParameters() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
Expand Down Expand Up @@ -159,12 +158,12 @@
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
private static final Logger LOGGER = getLogger(FlightSqlExample.class);
private static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
protected static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Location location;
private final PoolingDataSource<PoolableConnection> dataSource;
private final BufferAllocator rootAllocator = new RootAllocator();
protected final PoolingDataSource<PoolableConnection> dataSource;
protected final BufferAllocator rootAllocator = new RootAllocator();
private final Cache<ByteString, StatementContext<PreparedStatement>> preparedStatementLoadingCache;
private final Cache<ByteString, StatementContext<Statement>> statementLoadingCache;
private final SqlInfoBuilder sqlInfoBuilder;
Expand Down Expand Up @@ -779,7 +778,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
// Running on another thread
Future<?> unused = executorService.submit(() -> {
try {
final ByteString preparedStatementHandle = copyFrom(randomUUID().toString().getBytes(UTF_8));
final ByteString preparedStatementHandle = copyFrom(request.getQuery().getBytes(UTF_8));
// Ownership of the connection will be passed to the context. Do NOT close!
final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(request.getQuery(),
Expand Down Expand Up @@ -912,43 +911,17 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
return () -> {
assert statementContext != null;
PreparedStatement preparedStatement = statementContext.getStatement();
JdbcParameterBinder binder = null;

try {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
final JdbcParameterBinder binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
while (binder.next()) {
// Do not execute() - will be done in a getStream call
}
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try (
ArrowFileWriter writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
) {
writer.start();
writer.writeBatch();
}
if (out.size() > 0) {
final DoPutPreparedStatementResult doPutPreparedStatementResult =
DoPutPreparedStatementResult.newBuilder()
.setPreparedStatementHandle(ByteString.copyFrom(ByteBuffer.wrap(out.toByteArray())))
.build();

// Update prepared statement cache by storing with new handle and remove old entry.
preparedStatementLoadingCache.put(doPutPreparedStatementResult.getPreparedStatementHandle(),
statementContext);
// TODO: If we invalidate old cached entry here this invalidates the statement, which is not what is needed.
// We need to re-cache the statementContext with a new key.
// preparedStatementLoadingCache.invalidate(command.getPreparedStatementHandle());

try (final ArrowBuf buffer = rootAllocator.buffer(doPutPreparedStatementResult.getSerializedSize())) {
buffer.writeBytes(doPutPreparedStatementResult.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
}
}
}

} catch (SQLException | IOException e) {
} catch (SQLException e) {
ackStream.onError(CallStatus.INTERNAL
.withDescription("Failed to bind parameters: " + e.getMessage())
.withCause(e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.sql.example;

import static java.lang.String.format;
import static org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator;
import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema;
import static org.apache.arrow.flight.sql.impl.FlightSql.*;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcParameterBinder;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import org.slf4j.Logger;

import com.google.protobuf.ByteString;

/**
* Example {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server that generally
* supports all current features of Flight SQL.
*/
public class FlightSqlStatelessExample extends FlightSqlExample {
private static final Logger LOGGER = getLogger(FlightSqlStatelessExample.class);

public static void main(String[] args) throws Exception {
Location location = Location.forGrpcInsecure("localhost", 55555);
final FlightSqlStatelessExample example = new FlightSqlStatelessExample(location);
Location listenLocation = Location.forGrpcInsecure("0.0.0.0", 55555);
try (final BufferAllocator allocator = new RootAllocator();
final FlightServer server = FlightServer.builder(allocator, listenLocation, example).build()) {
server.start();
server.awaitTermination();
}
}

public FlightSqlStatelessExample(final Location location) {
super(location);
}

@Override
public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context,
FlightStream flightStream, StreamListener<PutResult> ackStream) {

return () -> {
try {
final String query = new String(command.getPreparedStatementHandle().toString("UTF-8"));
final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(query,
ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);

while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final JdbcParameterBinder binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
while (binder.next()) {
// Do not execute() - will be done in a getStream call
}

final ByteArrayOutputStream parametersStream = new ByteArrayOutputStream();
try (ArrowFileWriter writer = new ArrowFileWriter(root, null, Channels.newChannel(parametersStream))
) {
writer.start();
writer.writeBatch();
}

if (parametersStream.size() > 0) {
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO =
new DoPutPreparedStatementResultPOJO(query, parametersStream.toByteArray());
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject((Object) doPutPreparedStatementResultPOJO);
final byte[] doPutPreparedStatementResultPOJOArr = bos.toByteArray();
final DoPutPreparedStatementResult doPutPreparedStatementResult =
DoPutPreparedStatementResult.newBuilder()
.setPreparedStatementHandle(
ByteString.copyFrom(ByteBuffer.wrap(doPutPreparedStatementResultPOJOArr)))
.build();

try (final ArrowBuf buffer = rootAllocator.buffer(doPutPreparedStatementResult.getSerializedSize())) {
buffer.writeBytes(doPutPreparedStatementResult.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
}
}
}
}

} catch (SQLException | IOException e) {
ackStream.onError(CallStatus.INTERNAL
.withDescription("Failed to bind parameters: " + e.getMessage())
.withCause(e)
.toRuntimeException());
return;
}

ackStream.onCompleted();
};
}

@Override
public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context,
final ServerStreamListener listener) {
try {
final byte[] handle = command.getPreparedStatementHandle().asReadOnlyByteBuffer().array();
try (ByteArrayInputStream bis = new ByteArrayInputStream(handle);
ObjectInputStream ois = new ObjectInputStream(bis)) {
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO =
(DoPutPreparedStatementResultPOJO) ois.readObject();
final String query = doPutPreparedStatementResultPOJO.getQuery();
final Connection connection = dataSource.getConnection();
final PreparedStatement statement = connection.prepareStatement(query,
ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);

try (ArrowFileReader reader = new ArrowFileReader(new SeekableReadChannel(
new ByteArrayReadableSeekableByteChannel(
doPutPreparedStatementResultPOJO.getParameters())), rootAllocator)) {

for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
JdbcParameterBinder binder = JdbcParameterBinder.builder(statement, vectorSchemaRootRecover)
.bindAll().build();

while (binder.next()) {
try (final ResultSet resultSet = statement.executeQuery()) {
final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR);
try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
final VectorLoader loader = new VectorLoader(vectorSchemaRoot);
listener.start(vectorSchemaRoot);

final ArrowVectorIterator iterator = sqlToArrowVectorIterator(resultSet, rootAllocator);
while (iterator.hasNext()) {
final VectorSchemaRoot batch = iterator.next();
if (batch.getRowCount() == 0) {
break;
}
final VectorUnloader unloader = new VectorUnloader(batch);
loader.load(unloader.getRecordBatch());
listener.putNext();
vectorSchemaRoot.clear();
}
listener.putNext();
}
}
}
}
}
}
} catch (final SQLException | IOException | ClassNotFoundException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
listener.error(CallStatus.INTERNAL.withDescription("Failed to prepare statement: " + e).toRuntimeException());
} finally {
listener.completed();
}
}
}
Loading

0 comments on commit 7fb894e

Please sign in to comment.