Skip to content

Commit

Permalink
Add REST Catalog tests to Spark 3.5 integration test (#11093)
Browse files Browse the repository at this point in the history
* Add REST Catalog tests to Spark 3.5 integration test

Add REST Catalog tests to Spark 3.4 integration test

tmp save

Fix integ tests

Revert "Add REST Catalog tests to Spark 3.4 integration test"

This reverts commit d052416.

unneeded changes

fix test

retrigger checks

Fix integ test

Fix port already in use

Fix unmatched validation catalog

spotless

Fix sqlite related test failures

* Rebase & spotless

* code format

* unneeded change

* unneeded change

* Revert "unneeded change"

This reverts commit ae29c41.

* code format

* Use in-mem config to configure RCK

* Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java

* Use RESTServerExtension

* check style and test failure

* test failure

* fix test

* fix test

* spotless

* Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java

Co-authored-by: Eduard Tudenhoefner <[email protected]>

* Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java

Co-authored-by: Eduard Tudenhoefner <[email protected]>

* Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java

Co-authored-by: Eduard Tudenhoefner <[email protected]>

* Spotless and fix test

* Apply suggestions from code review

* Apply suggestions from code review

* Apply suggestions from code review

* Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java

* Package protected RCKUtils

* spotless

* unintentional change

* remove warehouse specification from rest

* spotless

* move find free port to rest server extension

* fix typo

* checkstyle

* fix unit test

---------

Co-authored-by: Haizhou Zhao <[email protected]>
Co-authored-by: Eduard Tudenhoefner <[email protected]>
  • Loading branch information
3 people authored Nov 21, 2024
1 parent 12845d4 commit a52afdc
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.iceberg.rest;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -77,14 +80,21 @@ static Map<String, String> environmentCatalogConfig() {
}

static RESTCatalog initCatalogClient() {
return initCatalogClient(Maps.newHashMap());
}

static RESTCatalog initCatalogClient(Map<String, String> properties) {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.putAll(RCKUtils.environmentCatalogConfig());
catalogProperties.putAll(Maps.fromProperties(System.getProperties()));
catalogProperties.putAll(properties);

// Set defaults
String port =
catalogProperties.getOrDefault(
RESTCatalogServer.REST_PORT, String.valueOf(RESTCatalogServer.REST_PORT_DEFAULT));
catalogProperties.putIfAbsent(
CatalogProperties.URI,
String.format("http://localhost:%s/", RESTCatalogServer.REST_PORT_DEFAULT));
CatalogProperties.URI, String.format("http://localhost:%s/", port));
catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, "rck_warehouse");

RESTCatalog catalog = new RESTCatalog();
Expand All @@ -107,4 +117,12 @@ static void purgeCatalogTestEntries(RESTCatalog catalog) {
catalog.dropNamespace(namespace);
});
}

static int findFreePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
Expand All @@ -37,12 +38,19 @@
public class RESTCatalogServer {
private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServer.class);

static final String REST_PORT = "rest.port";
public static final String REST_PORT = "rest.port";
static final int REST_PORT_DEFAULT = 8181;

private Server httpServer;
private final Map<String, String> config;

RESTCatalogServer() {}
RESTCatalogServer() {
this.config = Maps.newHashMap();
}

RESTCatalogServer(Map<String, String> config) {
this.config = config;
}

static class CatalogContext {
private final Catalog catalog;
Expand All @@ -64,7 +72,8 @@ public Map<String, String> configuration() {

private CatalogContext initializeBackendCatalog() throws IOException {
// Translate environment variables to catalog properties
Map<String, String> catalogProperties = RCKUtils.environmentCatalogConfig();
Map<String, String> catalogProperties = Maps.newHashMap(RCKUtils.environmentCatalogConfig());
catalogProperties.putAll(config);

// Fallback to a JDBCCatalog impl if one is not set
catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,49 @@
*/
package org.apache.iceberg.rest;

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public class RESTServerExtension implements BeforeAllCallback, AfterAllCallback {
// if the caller explicitly wants the server to start on port 0, it means the caller wants to
// launch on a free port
public static final String FREE_PORT = "0";

private RESTCatalogServer localServer;
private RESTCatalog client;
private final Map<String, String> config;

public RESTServerExtension() {
config = Maps.newHashMap();
}

public RESTServerExtension(Map<String, String> config) {
Map<String, String> conf = Maps.newHashMap(config);
if (conf.containsKey(RESTCatalogServer.REST_PORT)
&& conf.get(RESTCatalogServer.REST_PORT).equals(FREE_PORT)) {
conf.put(RESTCatalogServer.REST_PORT, String.valueOf(RCKUtils.findFreePort()));
}
this.config = conf;
}

public Map<String, String> config() {
return config;
}

public RESTCatalog client() {
return client;
}

@Override
public void beforeAll(ExtensionContext extensionContext) throws Exception {
if (Boolean.parseBoolean(
extensionContext.getConfigurationParameter(RCKUtils.RCK_LOCAL).orElse("true"))) {
this.localServer = new RESTCatalogServer();
this.localServer = new RESTCatalogServer(config);
this.localServer.start(false);
this.client = RCKUtils.initCatalogClient(config);
}
}

Expand All @@ -39,5 +69,8 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {
if (localServer != null) {
localServer.stop();
}
if (client != null) {
client.close();
}
}
}
22 changes: 22 additions & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
testImplementation libs.sqlite.jdbc
testImplementation libs.awaitility
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
}

test {
Expand Down Expand Up @@ -172,6 +177,12 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
testRuntimeOnly libs.sqlite.jdbc

testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
Expand Down Expand Up @@ -255,6 +266,17 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')

// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
// runtime dependencies for running REST Catalog based integration test
integrationRuntimeOnly project(path: ':iceberg-core', configuration: 'testArtifacts')
integrationRuntimeOnly (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
integrationRuntimeOnly libs.jetty.servlet
integrationRuntimeOnly libs.sqlite.jdbc

// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -521,7 +524,7 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
optional(3, "category", Types.StringType.get())));

spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append();

table.refresh();
Long currentSnapshotId = table.currentSnapshot().snapshotId();

Dataset<Row> actualFilesDs =
Expand Down Expand Up @@ -740,6 +743,11 @@ private boolean partitionMatch(Record file, String partValue) {

@TestTemplate
public void metadataLogEntriesAfterReplacingTable() throws Exception {
assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
.as(
"need to fix https://github.com/apache/iceberg/issues/11109 before enabling this for the REST catalog")
.isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);

sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,14 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

String statsFileName = "stats-file-" + UUID.randomUUID();
String location = table.location();
// not every catalog will return file proto for local directories
// i.e. Hadoop and Hive Catalog do, Jdbc and REST do not
if (!location.startsWith("file:")) {
location = "file:" + location;
}
File statsLocation =
new File(new URI(table.location()))
.toPath()
.resolve("data")
.resolve(statsFileName)
.toFile();
new File(new URI(location)).toPath().resolve("data").resolve(statsFileName).toFile();
StatisticsFile statisticsFile;
try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) {
long snapshotId = table.currentSnapshot().snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
Expand All @@ -43,6 +45,14 @@ protected static Object[][] parameters() {
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties()
},
{
SparkCatalogConfig.REST.catalogName(),
SparkCatalogConfig.REST.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
.build()
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public enum SparkCatalogConfig {
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
REST(
"testrest",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "rest", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.iceberg.spark;

import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
Expand All @@ -36,17 +41,38 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogServer;
import org.apache.iceberg.rest.RESTServerExtension;
import org.apache.iceberg.util.PropertyUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestBaseWithCatalog extends TestBase {
protected static File warehouse = null;

@RegisterExtension
private static final RESTServerExtension REST_SERVER_EXTENSION =
new RESTServerExtension(
Map.of(
RESTCatalogServer.REST_PORT,
RESTServerExtension.FREE_PORT,
// In-memory sqlite database by default is private to the connection that created it.
// If more than 1 jdbc connection backed by in-memory sqlite is created behind one
// JdbcCatalog, then different jdbc connections could provide different views of table
// status even belonging to the same catalog. Reference:
// https://www.sqlite.org/inmemorydb.html
CatalogProperties.CLIENT_POOL_SIZE,
"1"));

protected static RESTCatalog restCatalog;

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
protected static Object[][] parameters() {
return new Object[][] {
Expand All @@ -59,13 +85,14 @@ protected static Object[][] parameters() {
}

@BeforeAll
public static void createWarehouse() throws IOException {
public static void setUpAll() throws IOException {
TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null);
assertThat(warehouse.delete()).isTrue();
restCatalog = REST_SERVER_EXTENSION.client();
}

@AfterAll
public static void dropWarehouse() throws IOException {
public static void tearDownAll() throws IOException {
if (warehouse != null && warehouse.exists()) {
Path warehousePath = new Path(warehouse.getAbsolutePath());
FileSystem fs = warehousePath.getFileSystem(hiveConf);
Expand All @@ -89,13 +116,37 @@ public static void dropWarehouse() throws IOException {
protected TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
protected String tableName;

private void configureValidationCatalog() {
if (catalogConfig.containsKey(ICEBERG_CATALOG_TYPE)) {
switch (catalogConfig.get(ICEBERG_CATALOG_TYPE)) {
case ICEBERG_CATALOG_TYPE_HADOOP:
this.validationCatalog =
new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse);
break;
case ICEBERG_CATALOG_TYPE_REST:
this.validationCatalog = restCatalog;
break;
case ICEBERG_CATALOG_TYPE_HIVE:
this.validationCatalog = catalog;
break;
default:
throw new IllegalArgumentException("Unknown catalog type");
}
} else if (catalogConfig.containsKey(CATALOG_IMPL)) {
switch (catalogConfig.get(CATALOG_IMPL)) {
case "org.apache.iceberg.inmemory.InMemoryCatalog":
this.validationCatalog = new InMemoryCatalog();
break;
default:
throw new IllegalArgumentException("Unknown catalog impl");
}
}
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
}

@BeforeEach
public void before() {
this.validationCatalog =
catalogName.equals("testhadoop")
? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse)
: catalog;
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
configureValidationCatalog();

spark.conf().set("spark.sql.catalog." + catalogName, implementation);
catalogConfig.forEach(
Expand Down
Loading

0 comments on commit a52afdc

Please sign in to comment.