Skip to content

Commit

Permalink
GH 41262:[Java][FlightSQL] Implement stateless prepared statement
Browse files Browse the repository at this point in the history
PR comment updates
  • Loading branch information
stevelorddremio committed Apr 19, 2024
1 parent b8a6e07 commit c3ca784
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
FlightStream flightStream, StreamListener<PutResult> ackStream) {

return () -> {
try {
final String query = new String(command.getPreparedStatementHandle().toStringUtf8());
final PreparedStatement preparedStatement = createPreparedStatement(query);

final String query = new String(command.getPreparedStatementHandle().toStringUtf8());
try (PreparedStatement preparedStatement = createPreparedStatement(query)) {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final JdbcParameterBinder binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
Expand Down Expand Up @@ -141,16 +139,16 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
@Override
public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context,
final ServerStreamListener listener) {
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
try {
// Case where there are parameters
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
try {
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO =
deserializePOJO(handle);
final String query = doPutPreparedStatementResultPOJO.getQuery();
final PreparedStatement statement = createPreparedStatement(query);

try (ArrowFileReader reader = new ArrowFileReader(new SeekableReadChannel(
try (PreparedStatement statement = createPreparedStatement(query);
ArrowFileReader reader = new ArrowFileReader(new SeekableReadChannel(
new ByteArrayReadableSeekableByteChannel(
doPutPreparedStatementResultPOJO.getParameters())), rootAllocator)) {

Expand All @@ -168,8 +166,9 @@ public void getStreamPreparedStatement(final CommandPreparedStatementQuery comma
} catch (StreamCorruptedException e) {
// Case where there are no parameters
final String query = new String(command.getPreparedStatementHandle().toStringUtf8());
final PreparedStatement preparedStatement = createPreparedStatement(query);
executeQuery(preparedStatement, listener);
try (PreparedStatement preparedStatement = createPreparedStatement(query)) {
executeQuery(preparedStatement, listener);
}
}
} catch (final SQLException | IOException | ClassNotFoundException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
Expand Down Expand Up @@ -207,24 +206,21 @@ private void executeQuery(PreparedStatement statement,
public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command,
final CallContext context,
final FlightDescriptor descriptor) {
String query;
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
try {
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO = null;
String query;
try {
query = deserializePOJO(handle).getQuery();
} catch (StreamCorruptedException e) {
query = new String(command.getPreparedStatementHandle().toStringUtf8());
}
final PreparedStatement statement = createPreparedStatement(query);

ResultSetMetaData metaData = statement.getMetaData();
return getFlightInfoForSchema(command, descriptor,
jdbcToArrowSchema(metaData, DEFAULT_CALENDAR));
try (PreparedStatement statement = createPreparedStatement(query)) {
ResultSetMetaData metaData = statement.getMetaData();
return getFlightInfoForSchema(command, descriptor,
jdbcToArrowSchema(metaData, DEFAULT_CALENDAR));
}
} catch (final SQLException | IOException | ClassNotFoundException e) {
LOGGER.error(
format("There was a problem executing the prepared statement: <%s>.", e.getMessage()),
e);
LOGGER.error(format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e);
throw CallStatus.INTERNAL.withCause(e).toRuntimeException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ public void testSimplePreparedStatementResultsWithParameterBinding() throws Exce
prepare.setParameters(insertRoot);
final FlightInfo flightInfo = prepare.execute();

final FlightStream stream = sqlClient.getStream(flightInfo
.getEndpoints()
.get(0).getTicket());

// TODO: root is null and getSchema hangs when run as complete suite.
// This works when run as an individual test.
Assertions.assertAll(
() -> MatcherAssert.assertThat(stream.getSchema(), is(SCHEMA_INT_TABLE)),
() -> MatcherAssert.assertThat(getResults(stream), is(EXPECTED_RESULTS_FOR_PARAMETER_BINDING))
);
try (FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
// TODO: root is null and getSchema hangs when run as complete suite.
// This works when run as an individual test.
final VectorSchemaRoot root = stream.getRoot();
final Schema schema = root.getSchema();
Assertions.assertAll(
() -> MatcherAssert.assertThat(schema, is(SCHEMA_INT_TABLE)),
() -> MatcherAssert.assertThat(getResults(stream), is(EXPECTED_RESULTS_FOR_PARAMETER_BINDING))
);
}
}
}
}
Expand Down

0 comments on commit c3ca784

Please sign in to comment.