From 641a519322b2b1240f42ed7a06604a2d0e97b7f0 Mon Sep 17 00:00:00 2001 From: Steve Lord Date: Wed, 15 May 2024 21:51:57 -0700 Subject: [PATCH] GH 41262:[Java][FlightSQL] Implement stateless prepared statement Create separate database per test class --- .../flight/sql/example/FlightSqlExample.java | 85 ++++++++----------- .../example/FlightSqlStatelessExample.java | 6 +- .../arrow/flight/sql/test/TestFlightSql.java | 5 +- .../sql/test/TestFlightSqlStateless.java | 5 +- 4 files changed, 46 insertions(+), 55 deletions(-) diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java index f24e448d9720b..36362fd8681d3 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java @@ -156,9 +156,10 @@ * supports all current features of Flight SQL. */ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { - private static final String DATABASE_URI = "jdbc:derby:target/derbyDB"; private static final Logger LOGGER = getLogger(FlightSqlExample.class); protected static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar(); + public static final String DB_NAME = "derbyDB"; + private final String databaseUri; // ARROW-15315: Use ExecutorService to simulate an async scenario private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final Location location; @@ -170,7 +171,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { public static void main(String[] args) throws Exception { Location location = Location.forGrpcInsecure("localhost", 55555); - final FlightSqlExample example = new FlightSqlExample(location); + final FlightSqlExample example = new FlightSqlExample(location, DB_NAME); Location listenLocation = Location.forGrpcInsecure("0.0.0.0", 55555); try (final BufferAllocator allocator = new RootAllocator(); final FlightServer server = FlightServer.builder(allocator, listenLocation, example).build()) { @@ -179,13 +180,14 @@ public static void main(String[] args) throws Exception { } } - public FlightSqlExample(final Location location) { + public FlightSqlExample(final Location location, final String dbName) { // TODO Constructor should not be doing work. checkState( - removeDerbyDatabaseIfExists() && populateDerbyDatabase(), + removeDerbyDatabaseIfExists(dbName) && populateDerbyDatabase(dbName), "Failed to reset Derby database!"); + databaseUri = "jdbc:derby:target/" + dbName; final ConnectionFactory connectionFactory = - new DriverManagerConnectionFactory(DATABASE_URI, new Properties()); + new DriverManagerConnectionFactory(databaseUri, new Properties()); final PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null); final ObjectPool connectionPool = new GenericObjectPool<>(poolableConnectionFactory); @@ -248,50 +250,37 @@ public FlightSqlExample(final Location location) { } - public static boolean removeDerbyDatabaseIfExists() { + public static boolean removeDerbyDatabaseIfExists(final String dbName) { boolean wasSuccess; - final Path path = Paths.get("target" + File.separator + "derbyDB"); - - if (Files.exists(path)) { - try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true"); - Statement statement = connection.createStatement()) { - dropTable(statement, "intTable"); - dropTable(statement, "foreignTable"); - } catch (final SQLException e) { - LOGGER.error(format("Failed attempt to drop tables in DerbyDB: <%s>", e.getMessage()), e); - return false; + final Path path = Paths.get("target" + File.separator + dbName); + + try (final Stream walk = Files.walk(path)) { + /* + * Iterate over all paths to delete, mapping each path to the outcome of its own + * deletion as a boolean representing whether or not each individual operation was + * successful; then reduce all booleans into a single answer, and store that into + * `wasSuccess`, which will later be returned by this method. + * If for whatever reason the resulting `Stream` is empty, throw an `IOException`; + * this not expected. + */ + wasSuccess = walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete) + .reduce(Boolean::logicalAnd).orElseThrow(IOException::new); + } catch (IOException e) { + /* + * The only acceptable scenario for an `IOException` to be thrown here is if + * an attempt to delete an non-existing file takes place -- which should be + * alright, since they would be deleted anyway. + */ + if (!(wasSuccess = e instanceof NoSuchFileException)) { + LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", e.getMessage()), e); } - - try (final Stream walk = Files.walk(path)) { - /* - * Iterate over all paths to delete, mapping each path to the outcome of its own - * deletion as a boolean representing whether or not each individual operation was - * successful; then reduce all booleans into a single answer, and store that into - * `wasSuccess`, which will later be returned by this method. - * If for whatever reason the resulting `Stream` is empty, throw an `IOException`; - * this not expected. - */ - wasSuccess = walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete) - .reduce(Boolean::logicalAnd).orElseThrow(IOException::new); - } catch (IOException e) { - /* - * The only acceptable scenario for an `IOException` to be thrown here is if - * an attempt to delete an non-existing file takes place -- which should be - * alright, since they would be deleted anyway. - */ - if (!(wasSuccess = e instanceof NoSuchFileException)) { - LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", e.getMessage()), e); - } - } - } else { - return true; } return wasSuccess; } - private static boolean populateDerbyDatabase() { - try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true"); + private static boolean populateDerbyDatabase(final String dbName) { + try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/" + dbName + ";create=true"); Statement statement = connection.createStatement()) { dropTable(statement, "intTable"); @@ -791,8 +780,6 @@ public void close() throws Exception { LOGGER.error(format("Failed to close resources: <%s>", t.getMessage()), t); } - // removeDerbyDatabaseIfExists(); - AutoCloseables.close(dataSource, rootAllocator); } @@ -1066,7 +1053,7 @@ public void getStreamTables(final CommandGetTables command, final CallContext co final String[] tableTypes = protocolSize == 0 ? null : protocolStringList.toArray(new String[protocolSize]); - try (final Connection connection = DriverManager.getConnection(DATABASE_URI); + try (final Connection connection = DriverManager.getConnection(databaseUri); final VectorSchemaRoot vectorSchemaRoot = getTablesRoot( connection.getMetaData(), rootAllocator, @@ -1117,7 +1104,7 @@ public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final Call final String schema = command.hasDbSchema() ? command.getDbSchema() : null; final String table = command.getTable(); - try (Connection connection = DriverManager.getConnection(DATABASE_URI)) { + try (Connection connection = DriverManager.getConnection(databaseUri)) { final ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(catalog, schema, table); final VarCharVector catalogNameVector = new VarCharVector("catalog_name", rootAllocator); @@ -1171,7 +1158,7 @@ public void getStreamExportedKeys(final CommandGetExportedKeys command, final Ca String schema = command.hasDbSchema() ? command.getDbSchema() : null; String table = command.getTable(); - try (Connection connection = DriverManager.getConnection(DATABASE_URI); + try (Connection connection = DriverManager.getConnection(databaseUri); ResultSet keys = connection.getMetaData().getExportedKeys(catalog, schema, table); VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) { listener.start(vectorSchemaRoot); @@ -1196,7 +1183,7 @@ public void getStreamImportedKeys(final CommandGetImportedKeys command, final Ca String schema = command.hasDbSchema() ? command.getDbSchema() : null; String table = command.getTable(); - try (Connection connection = DriverManager.getConnection(DATABASE_URI); + try (Connection connection = DriverManager.getConnection(databaseUri); ResultSet keys = connection.getMetaData().getImportedKeys(catalog, schema, table); VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) { listener.start(vectorSchemaRoot); @@ -1224,7 +1211,7 @@ public void getStreamCrossReference(CommandGetCrossReference command, CallContex final String pkTable = command.getPkTable(); final String fkTable = command.getFkTable(); - try (Connection connection = DriverManager.getConnection(DATABASE_URI); + try (Connection connection = DriverManager.getConnection(databaseUri); ResultSet keys = connection.getMetaData() .getCrossReference(pkCatalog, pkSchema, pkTable, fkCatalog, fkSchema, fkTable); VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) { diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlStatelessExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlStatelessExample.java index 8d1d901eb94eb..c79c09c0967dc 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlStatelessExample.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlStatelessExample.java @@ -66,9 +66,11 @@ */ public class FlightSqlStatelessExample extends FlightSqlExample { private static final Logger LOGGER = getLogger(FlightSqlStatelessExample.class); + public static final String DB_NAME = "derbyStatelessDB"; - public FlightSqlStatelessExample(final Location location) { - super(location); + + public FlightSqlStatelessExample(final Location location, final String dbName) { + super(location, dbName); } @Override diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSql.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSql.java index f400051ae53e3..ffffdd62ac950 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSql.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSql.java @@ -105,7 +105,8 @@ private static void setUpClientServer() throws Exception { allocator = new RootAllocator(Integer.MAX_VALUE); final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, 0); - server = FlightServer.builder(allocator, serverLocation, new FlightSqlExample(serverLocation)) + server = FlightServer.builder(allocator, serverLocation, + new FlightSqlExample(serverLocation, FlightSqlExample.DB_NAME)) .build() .start(); @@ -151,7 +152,7 @@ protected static void setUpExpectedResultsMap() { @AfterAll public static void tearDown() throws Exception { close(sqlClient, server, allocator); - FlightSqlExample.removeDerbyDatabaseIfExists(); + FlightSqlExample.removeDerbyDatabaseIfExists(FlightSqlExample.DB_NAME); } private static List> getNonConformingResultsForGetSqlInfo(final List> results) { diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSqlStateless.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSqlStateless.java index 42e7482919706..09c7b2ef87f45 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSqlStateless.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/test/TestFlightSqlStateless.java @@ -54,14 +54,15 @@ public static void setUp() throws Exception { @AfterAll public static void tearDown() throws Exception { close(sqlClient, server, allocator); - FlightSqlStatelessExample.removeDerbyDatabaseIfExists(); + FlightSqlStatelessExample.removeDerbyDatabaseIfExists(FlightSqlStatelessExample.DB_NAME); } private static void setUpClientServer() throws Exception { allocator = new RootAllocator(Integer.MAX_VALUE); final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, 0); - server = FlightServer.builder(allocator, serverLocation, new FlightSqlStatelessExample(serverLocation)) + server = FlightServer.builder(allocator, serverLocation, + new FlightSqlStatelessExample(serverLocation, FlightSqlStatelessExample.DB_NAME)) .build() .start();