Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH 37720:[Java][FlightSQL] Implement stateless prepared statement #1

Closed
wants to merge 7 commits into from
21 changes: 21 additions & 0 deletions format/FlightSql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,27 @@ message DoPutUpdateResult {
int64 record_count = 1;
}

/* An *optional* response returned when `DoPut` is called with `CommandPreparedStatementQuery`.
*
* *Note on legacy behavior*: previous versions of the protocol did not return any result for
* this command, and that behavior should still be supported by clients. See documentation
* of individual fields for more details on expected client behavior in this case.
*/
message DoPutPreparedStatementResult {
option (experimental) = true;

// Represents a (potentially updated) opaque handle for the prepared statement on the server.
// Because the handle could potentially be updated, any previous handles for this prepared
// statement should be considered invalid, and all subsequent requests for this prepared
// statement must use this new handle, if specified.
// The updated handle allows implementing query parameters with stateless services
// as described in https://github.com/apache/arrow/issues/37720.
//
// When an updated handle is not provided by the server, clients should contiue
// using the previous handle provided by `ActionCreatePreparedStatementResonse`.
optional bytes prepared_statement_handle = 1;
}

/*
* Request message for the "CancelQuery" action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.SyncPutListener;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
import org.apache.arrow.flight.sql.util.TableRef;
Expand Down Expand Up @@ -1048,14 +1049,36 @@ private Schema deserializeSchema(final ByteString bytes) {
public FlightInfo execute(final CallOption... options) {
checkOpen();

final FlightDescriptor descriptor = FlightDescriptor
FlightDescriptor descriptor = FlightDescriptor
.command(Any.pack(CommandPreparedStatementQuery.newBuilder()
.setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle())
.build())
.toByteArray());

if (parameterBindingRoot != null && parameterBindingRoot.getRowCount() > 0) {
putParameters(descriptor, options);
if (getParameterSchema().getFields().size() > 0 &&
parameterBindingRoot != null &&
parameterBindingRoot.getRowCount() > 0) {
SyncPutListener putListener = putParameters(descriptor, options);

try {
final PutResult read = putListener.read();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to do this even in the case where no parameters were returned?

Copy link
Owner Author

@stevelorddremio stevelorddremio Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On re-reading this, I'm not sure I understand your comment.
If no parameters are returned but parameters were set in parameterBindingRoot then it would be an error condition. The only way I can see we know if any parameters are returned from putParameters is if we read putListener. Did I miss something?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we shouldn't do parameter binding (line 1059) if we already got the parameter schema and it was empty.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can check for this.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (read != null) {
try (final ArrowBuf metadata = read.getApplicationMetadata()) {
final FlightSql.DoPutPreparedStatementResult doPutPreparedStatementResult =
FlightSql.DoPutPreparedStatementResult.parseFrom(metadata.nioBuffer());
descriptor = FlightDescriptor
.command(Any.pack(CommandPreparedStatementQuery.newBuilder()
.setPreparedStatementHandle(
doPutPreparedStatementResult.getPreparedStatementHandle())
.build())
.toByteArray());
stevelorddremio marked this conversation as resolved.
Show resolved Hide resolved
}
}
} catch ( final InterruptedException | ExecutionException e) {
throw CallStatus.CANCELLED.withCause(e).toRuntimeException();
} catch ( final InvalidProtocolBufferException e) {
throw CallStatus.INVALID_ARGUMENT.withCause(e).toRuntimeException();
}
}

return client.getInfo(descriptor, options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.sql.example;

import java.io.Serializable;

public class DoPutPreparedStatementResultPOJO implements Serializable {
private String query;
private byte[] parameters;

public DoPutPreparedStatementResultPOJO(String query, byte[] parameters) {
this.query = query;
this.parameters = parameters.clone();
}

public String getQuery() {
return query;
}

public byte[] getParameters() {
return parameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
private static final Logger LOGGER = getLogger(FlightSqlExample.class);
private static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
protected static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Location location;
private final PoolingDataSource<PoolableConnection> dataSource;
private final BufferAllocator rootAllocator = new RootAllocator();
protected final PoolingDataSource<PoolableConnection> dataSource;
protected final BufferAllocator rootAllocator = new RootAllocator();
private final Cache<ByteString, StatementContext<PreparedStatement>> preparedStatementLoadingCache;
private final Cache<ByteString, StatementContext<Statement>> statementLoadingCache;
private final SqlInfoBuilder sqlInfoBuilder;
Expand Down Expand Up @@ -778,7 +778,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
// Running on another thread
Future<?> unused = executorService.submit(() -> {
try {
final ByteString preparedStatementHandle = copyFrom(randomUUID().toString().getBytes(UTF_8));
final ByteString preparedStatementHandle = copyFrom(request.getQuery().getBytes(UTF_8));
// Ownership of the connection will be passed to the context. Do NOT close!
final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(request.getQuery(),
Expand Down Expand Up @@ -882,7 +882,7 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate
while (binder.next()) {
preparedStatement.addBatch();
}
int[] recordCounts = preparedStatement.executeBatch();
final int[] recordCounts = preparedStatement.executeBatch();
recordCount = Arrays.stream(recordCounts).sum();
}

Expand Down Expand Up @@ -928,6 +928,7 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
.toRuntimeException());
return;
}

ackStream.onCompleted();
};
}
Expand Down Expand Up @@ -1280,7 +1281,7 @@ public void getStreamStatement(final TicketStatementQuery ticketStatementQuery,
}
}

private <T extends Message> FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor,
protected <T extends Message> FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor,
final Schema schema) {
final Ticket ticket = new Ticket(pack(request).toByteArray());
// TODO Support multiple endpoints.
Expand Down
Loading
Loading