Skip to content

Commit

Permalink
GH 41262:[Java][FlightSQL] Implement stateless prepared statement
Browse files Browse the repository at this point in the history
Create separate database per test class
  • Loading branch information
stevelorddremio committed May 16, 2024
1 parent 87d2fa4 commit 54a8770
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@
* supports all current features of Flight SQL.
*/
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
private static final Logger LOGGER = getLogger(FlightSqlExample.class);
protected static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
public static final String DB_NAME = "derbyDB";
private final String databaseUri;
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Location location;
Expand All @@ -170,7 +171,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {

public static void main(String[] args) throws Exception {
Location location = Location.forGrpcInsecure("localhost", 55555);
final FlightSqlExample example = new FlightSqlExample(location);
final FlightSqlExample example = new FlightSqlExample(location, DB_NAME);
Location listenLocation = Location.forGrpcInsecure("0.0.0.0", 55555);
try (final BufferAllocator allocator = new RootAllocator();
final FlightServer server = FlightServer.builder(allocator, listenLocation, example).build()) {
Expand All @@ -179,13 +180,14 @@ public static void main(String[] args) throws Exception {
}
}

public FlightSqlExample(final Location location) {
public FlightSqlExample(final Location location, final String dbName) {
// TODO Constructor should not be doing work.
checkState(
removeDerbyDatabaseIfExists() && populateDerbyDatabase(),
removeDerbyDatabaseIfExists(dbName) && populateDerbyDatabase(dbName),
"Failed to reset Derby database!");
databaseUri = "jdbc:derby:target/" + dbName;
final ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(DATABASE_URI, new Properties());
new DriverManagerConnectionFactory(databaseUri, new Properties());
final PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
final ObjectPool<PoolableConnection> connectionPool = new GenericObjectPool<>(poolableConnectionFactory);
Expand Down Expand Up @@ -248,50 +250,37 @@ public FlightSqlExample(final Location location) {

}

public static boolean removeDerbyDatabaseIfExists() {
public static boolean removeDerbyDatabaseIfExists(final String dbName) {
boolean wasSuccess;
final Path path = Paths.get("target" + File.separator + "derbyDB");

if (Files.exists(path)) {
try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true");
Statement statement = connection.createStatement()) {
dropTable(statement, "intTable");
dropTable(statement, "foreignTable");
} catch (final SQLException e) {
LOGGER.error(format("Failed attempt to drop tables in DerbyDB: <%s>", e.getMessage()), e);
return false;
final Path path = Paths.get("target" + File.separator + dbName);

try (final Stream<Path> walk = Files.walk(path)) {
/*
* Iterate over all paths to delete, mapping each path to the outcome of its own
* deletion as a boolean representing whether or not each individual operation was
* successful; then reduce all booleans into a single answer, and store that into
* `wasSuccess`, which will later be returned by this method.
* If for whatever reason the resulting `Stream<Boolean>` is empty, throw an `IOException`;
* this not expected.
*/
wasSuccess = walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete)
.reduce(Boolean::logicalAnd).orElseThrow(IOException::new);
} catch (IOException e) {
/*
* The only acceptable scenario for an `IOException` to be thrown here is if
* an attempt to delete an non-existing file takes place -- which should be
* alright, since they would be deleted anyway.
*/
if (!(wasSuccess = e instanceof NoSuchFileException)) {
LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", e.getMessage()), e);
}

try (final Stream<Path> walk = Files.walk(path)) {
/*
* Iterate over all paths to delete, mapping each path to the outcome of its own
* deletion as a boolean representing whether or not each individual operation was
* successful; then reduce all booleans into a single answer, and store that into
* `wasSuccess`, which will later be returned by this method.
* If for whatever reason the resulting `Stream<Boolean>` is empty, throw an `IOException`;
* this not expected.
*/
wasSuccess = walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete)
.reduce(Boolean::logicalAnd).orElseThrow(IOException::new);
} catch (IOException e) {
/*
* The only acceptable scenario for an `IOException` to be thrown here is if
* an attempt to delete an non-existing file takes place -- which should be
* alright, since they would be deleted anyway.
*/
if (!(wasSuccess = e instanceof NoSuchFileException)) {
LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", e.getMessage()), e);
}
}
} else {
return true;
}

return wasSuccess;
}

private static boolean populateDerbyDatabase() {
try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true");
private static boolean populateDerbyDatabase(final String dbName) {
try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/" + dbName + ";create=true");
Statement statement = connection.createStatement()) {

dropTable(statement, "intTable");
Expand Down Expand Up @@ -791,8 +780,6 @@ public void close() throws Exception {
LOGGER.error(format("Failed to close resources: <%s>", t.getMessage()), t);
}

// removeDerbyDatabaseIfExists();

AutoCloseables.close(dataSource, rootAllocator);
}

Expand Down Expand Up @@ -1066,7 +1053,7 @@ public void getStreamTables(final CommandGetTables command, final CallContext co
final String[] tableTypes =
protocolSize == 0 ? null : protocolStringList.toArray(new String[protocolSize]);

try (final Connection connection = DriverManager.getConnection(DATABASE_URI);
try (final Connection connection = DriverManager.getConnection(databaseUri);
final VectorSchemaRoot vectorSchemaRoot = getTablesRoot(
connection.getMetaData(),
rootAllocator,
Expand Down Expand Up @@ -1117,7 +1104,7 @@ public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final Call
final String schema = command.hasDbSchema() ? command.getDbSchema() : null;
final String table = command.getTable();

try (Connection connection = DriverManager.getConnection(DATABASE_URI)) {
try (Connection connection = DriverManager.getConnection(databaseUri)) {
final ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(catalog, schema, table);

final VarCharVector catalogNameVector = new VarCharVector("catalog_name", rootAllocator);
Expand Down Expand Up @@ -1171,7 +1158,7 @@ public void getStreamExportedKeys(final CommandGetExportedKeys command, final Ca
String schema = command.hasDbSchema() ? command.getDbSchema() : null;
String table = command.getTable();

try (Connection connection = DriverManager.getConnection(DATABASE_URI);
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys = connection.getMetaData().getExportedKeys(catalog, schema, table);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
listener.start(vectorSchemaRoot);
Expand All @@ -1196,7 +1183,7 @@ public void getStreamImportedKeys(final CommandGetImportedKeys command, final Ca
String schema = command.hasDbSchema() ? command.getDbSchema() : null;
String table = command.getTable();

try (Connection connection = DriverManager.getConnection(DATABASE_URI);
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys = connection.getMetaData().getImportedKeys(catalog, schema, table);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
listener.start(vectorSchemaRoot);
Expand Down Expand Up @@ -1224,7 +1211,7 @@ public void getStreamCrossReference(CommandGetCrossReference command, CallContex
final String pkTable = command.getPkTable();
final String fkTable = command.getFkTable();

try (Connection connection = DriverManager.getConnection(DATABASE_URI);
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys = connection.getMetaData()
.getCrossReference(pkCatalog, pkSchema, pkTable, fkCatalog, fkSchema, fkTable);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@
*/
public class FlightSqlStatelessExample extends FlightSqlExample {
private static final Logger LOGGER = getLogger(FlightSqlStatelessExample.class);
public static final String DB_NAME = "derbyStatelessDB";

public FlightSqlStatelessExample(final Location location) {
super(location);

public FlightSqlStatelessExample(final Location location, final String dbName) {
super(location, dbName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private static void setUpClientServer() throws Exception {
allocator = new RootAllocator(Integer.MAX_VALUE);

final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, 0);
server = FlightServer.builder(allocator, serverLocation, new FlightSqlExample(serverLocation))
server = FlightServer.builder(allocator, serverLocation,
new FlightSqlExample(serverLocation, FlightSqlExample.DB_NAME))
.build()
.start();

Expand Down Expand Up @@ -151,7 +152,7 @@ protected static void setUpExpectedResultsMap() {
@AfterAll
public static void tearDown() throws Exception {
close(sqlClient, server, allocator);
FlightSqlExample.removeDerbyDatabaseIfExists();
FlightSqlExample.removeDerbyDatabaseIfExists(FlightSqlExample.DB_NAME);
}

private static List<List<String>> getNonConformingResultsForGetSqlInfo(final List<? extends List<String>> results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ public static void setUp() throws Exception {
@AfterAll
public static void tearDown() throws Exception {
close(sqlClient, server, allocator);
FlightSqlStatelessExample.removeDerbyDatabaseIfExists();
FlightSqlStatelessExample.removeDerbyDatabaseIfExists(FlightSqlStatelessExample.DB_NAME);
}

private static void setUpClientServer() throws Exception {
allocator = new RootAllocator(Integer.MAX_VALUE);

final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, 0);
server = FlightServer.builder(allocator, serverLocation, new FlightSqlStatelessExample(serverLocation))
server = FlightServer.builder(allocator, serverLocation,
new FlightSqlStatelessExample(serverLocation, FlightSqlStatelessExample.DB_NAME))
.build()
.start();

Expand Down

0 comments on commit 54a8770

Please sign in to comment.