diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c98cc323c9fe..055475809296 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,7 +45,7 @@ flink119 = { strictly = "1.19.0"} google-libraries-bom = "26.43.0" guava = "33.2.1-jre" hadoop2 = "2.7.3" -hadoop3-client = "3.3.6" +hadoop3 = "3.3.6" httpcomponents-httpclient5 = "5.3.1" hive2 = { strictly = "2.3.9"} # see rich version usage explanation above hive3 = "3.1.3" @@ -132,7 +132,8 @@ hadoop2-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "ha hadoop2-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop2" } hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop2" } hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" } -hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3-client" } +hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3" } +hadoop3-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop3" } hive2-exec = { module = "org.apache.hive:hive-exec", version.ref = "hive2" } hive2-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive2" } hive2-serde = { module = "org.apache.hive:hive-serde", version.ref = "hive2" } diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 1fdd6bc6ea4c..60fa879d291f 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -17,7 +17,7 @@ * under the License. */ -project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { +project(':iceberg-kafka-connect:iceberg-kafka-connect-events') { dependencies { api project(':iceberg-api') implementation project(':iceberg-core') @@ -28,10 +28,10 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { test { useJUnitPlatform() - } + } } -project(":iceberg-kafka-connect:iceberg-kafka-connect") { +project(':iceberg-kafka-connect:iceberg-kafka-connect') { dependencies { api project(':iceberg-api') implementation project(':iceberg-core') @@ -57,3 +57,181 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect") { useJUnitPlatform() } } + +project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { + apply plugin: 'distribution' + + configurations { + hive { + extendsFrom runtimeClasspath + } + all { + exclude group: 'javax.activation', module: 'activation' + // force upgrades for dependencies with known vulnerabilities... + resolutionStrategy { + force 'org.codehaus.jettison:jettison:1.5.4' + force 'org.xerial.snappy:snappy-java:1.1.10.5' + force 'org.apache.commons:commons-compress:1.26.0' + force 'org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0' + } + } + } + + sourceSets { + integration { + java.srcDir "$projectDir/src/integration/java" + resources.srcDir "$projectDir/src/integration/resources" + } + } + + configurations { + integrationImplementation.extendsFrom testImplementation + integrationRuntime.extendsFrom testRuntimeOnly + } + + dependencies { + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect') + implementation(libs.hadoop3.common) { + exclude group: 'log4j' + exclude group: 'org.slf4j' + exclude group: 'ch.qos.reload4j' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'com.fasterxml.woodstox' + exclude group: 'com.google.guava' + exclude group: 'com.google.protobuf' + exclude group: 'org.apache.curator' + exclude group: 'org.apache.zookeeper' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.hadoop', module: 'hadoop-auth' + exclude group: 'org.apache.commons', module: 'commons-configuration2' + exclude group: 'org.apache.hadoop.thirdparty', module: 'hadoop-shaded-protobuf_3_7' + exclude group: 'org.codehaus.woodstox' + exclude group: 'org.eclipse.jetty' + } + implementation project(':iceberg-orc') + implementation project(':iceberg-parquet') + + implementation project(':iceberg-aws') + implementation platform(libs.awssdk.bom) + implementation 'software.amazon.awssdk:apache-client' + implementation 'software.amazon.awssdk:auth' + implementation 'software.amazon.awssdk:iam' + implementation 'software.amazon.awssdk:sso' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:kms' + implementation 'software.amazon.awssdk:glue' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:dynamodb' + implementation 'software.amazon.awssdk:lakeformation' + + implementation project(':iceberg-gcp') + implementation platform(libs.google.libraries.bom) + implementation 'com.google.cloud:google-cloud-storage' + + implementation project(':iceberg-azure') + implementation platform(libs.azuresdk.bom) + implementation 'com.azure:azure-storage-file-datalake' + implementation 'com.azure:azure-identity' + + hive project(':iceberg-hive-metastore') + hive(libs.hive2.metastore) { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-applicationhistoryservice' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-web-proxy' + exclude group: 'org.apache.hive', module: 'hive-service-rpc' + exclude group: 'com.github.joshelser', module: 'dropwizard-metrics-hadoop-metrics2-reporter' + } + hive(libs.hadoop3.client) { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + + integrationImplementation project(':iceberg-api') + integrationImplementation project(':iceberg-common') + integrationImplementation project(':iceberg-core') + integrationImplementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + integrationImplementation platform(libs.jackson.bom) + integrationImplementation libs.jackson.core + integrationImplementation libs.jackson.databind + integrationImplementation libs.jackson.databind + integrationImplementation libs.kafka.clients + integrationImplementation libs.kafka.connect.api + integrationImplementation libs.kafka.connect.json + integrationImplementation libs.testcontainers + integrationImplementation libs.httpcomponents.httpclient5 + integrationImplementation libs.awaitility + } + + task integrationTest(type: Test) { + useJUnitPlatform() + testClassesDirs = sourceSets.integration.output.classesDirs + classpath = sourceSets.integration.runtimeClasspath + jvmArgs += project.property('extraJvmArgs') + } + + processResources { + filter { + it.replace('__VERSION__', project.version.toString()) + } + } + + distributions { + main { + contents { + from "${processResources.destinationDir}/manifest.json" + into('lib/') { + from configurations.runtimeClasspath + } + into('doc/') { + from "$rootDir/LICENSE" + } + into('assets/') { + from "${processResources.destinationDir}/iceberg.png" + } + } + } + hive { + contents { + from "${processResources.destinationDir}/manifest.json" + into('lib/') { + from configurations.hive + } + into('doc/') { + from "$rootDir/LICENSE" + } + into('assets/') { + from "${processResources.destinationDir}/iceberg.png" + } + } + } + } + + tasks.jar.enabled = false + + tasks.distTar.enabled = false + distZip.dependsOn processResources + installDist.dependsOn processResources + + tasks.hiveDistTar.enabled = false + hiveDistZip.dependsOn processResources + installHiveDist.dependsOn processResources + + integrationTest.dependsOn installDist + check.dependsOn integrationTest + + assemble.dependsOn distZip, hiveDistZip +} diff --git a/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml b/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml new file mode 100644 index 000000000000..202180289d96 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +volumes: + data: {} + +services: + minio: + image: minio/minio + hostname: minio + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + ports: + - 9000:9000 + - 9001:9001 + volumes: + - data:/data + command: server /data --console-address ":9001" + + create-bucket: + image: minio/mc + depends_on: + - minio + volumes: + - data:/data + entrypoint: mc mb /data/bucket + + iceberg: + image: tabulario/iceberg-rest + depends_on: + - create-bucket + hostname: iceberg + ports: + - 8181:8181 + environment: + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://bucket/warehouse/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + - CATALOG_S3_PATH__STYLE__ACCESS=true + - CATALOG_S3_ACCESS__KEY__ID=minioadmin + - CATALOG_S3_SECRET__ACCESS__KEY=minioadmin + + kafka: + image: confluentinc/cp-kafka + hostname: kafka + ports: + - 29092:29092 + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENERS: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: E-JXLvCaTiaUYDb1LwZ1JQ + + connect: + image: confluentinc/cp-kafka-connect + depends_on: + - kafka + hostname: connect + ports: + - 8083:8083 + volumes: + - ../build/install:/test/kafka-connect + environment: + CONNECT_REST_ADVERTISED_HOST_NAME: localhost + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: kc + CONNECT_CONFIG_STORAGE_TOPIC: kc-config + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: kc-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: kc-storage + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false + CONNECT_PLUGIN_PATH: /test/kafka-connect + CONNECT_BOOTSTRAP_SERVERS: kafka:9092 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 500 diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java new file mode 100644 index 000000000000..5c458ad3fa78 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationDynamicTableTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE1 = "tbl1"; + private static final String TEST_TABLE2 = "tbl2"; + private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); + private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSink(String branch) { + // partitioned table + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + // unpartitioned table + catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema); + + List files = dataFiles(TABLE_IDENTIFIER1, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); + + files = dataFiles(TABLE_IDENTIFIER2, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER2, branch); + } + + private void runTest(String branch, boolean useSchema) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.tables.dynamic-enabled", true) + .config("iceberg.tables.route-field", "payload") + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + if (!useSchema) { + connectorConfig.config("value.converter.schemas.enable", false); + } + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1); + TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2); + TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + send(testTopic(), event3, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + Table table = catalog().loadTable(TABLE_IDENTIFIER1); + assertThat(table.snapshots()).hasSize(1); + table = catalog().loadTable(TABLE_IDENTIFIER2); + assertThat(table.snapshots()).hasSize(1); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java new file mode 100644 index 000000000000..7cffbd8838b2 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationMultiTableTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE1 = "foobar1"; + private static final String TEST_TABLE2 = "foobar2"; + private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); + private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSink(String branch) { + // partitioned table + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + // unpartitioned table + catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema); + + List files = dataFiles(TABLE_IDENTIFIER1, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); + + files = dataFiles(TABLE_IDENTIFIER2, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER2, branch); + } + + private void runTest(String branch, boolean useSchema) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config( + "iceberg.tables", + String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) + .config("iceberg.tables.route-field", "type") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2") + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + // use a schema for one of the cases + if (!useSchema) { + connectorConfig.config("value.converter.schemas.enable", false); + } + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); + TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?"); + TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + send(testTopic(), event3, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + Table table = catalog().loadTable(TABLE_IDENTIFIER1); + assertThat(table.snapshots()).hasSize(1); + table = catalog().loadTable(TABLE_IDENTIFIER2); + assertThat(table.snapshots()).hasSize(1); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java new file mode 100644 index 000000000000..80a74539311c --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.TimestampType; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE = "foobar"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TEST_DB, TEST_TABLE); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkPartitionedTable(String branch) { + catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of()); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // partition may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertThat(files.get(1).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkUnpartitionedTable(String branch) { + catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of()); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkSchemaEvolution(String branch) { + Schema initialSchema = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "type", Types.StringType.get()))); + catalog().createTable(TABLE_IDENTIFIER, initialSchema); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true")); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + + // when not using a value schema, the ID data type will not be updated + Class expectedIdType = + useSchema ? Types.LongType.class : Types.IntegerType.class; + + assertGeneratedSchema(useSchema, expectedIdType); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkAutoCreate(String branch) { + boolean useSchema = branch == null; // use a schema for one of the tests + + Map extraConfig = Maps.newHashMap(); + extraConfig.put("iceberg.tables.auto-create-enabled", "true"); + if (useSchema) { + // partition the table for one of the tests + extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)"); + } + + runTest(branch, useSchema, extraConfig); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + + assertGeneratedSchema(useSchema, LongType.class); + + PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER).spec(); + assertThat(spec.isPartitioned()).isEqualTo(useSchema); + } + + private void assertGeneratedSchema(boolean useSchema, Class expectedIdType) { + Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER).schema(); + assertThat(tableSchema.findField("id").type()).isInstanceOf(expectedIdType); + assertThat(tableSchema.findField("type").type()).isInstanceOf(StringType.class); + assertThat(tableSchema.findField("payload").type()).isInstanceOf(StringType.class); + + if (!useSchema) { + // without a schema we can only map the primitive type + assertThat(tableSchema.findField("ts").type()).isInstanceOf(LongType.class); + // null values should be ignored when not using a value schema + assertThat(tableSchema.findField("op")).isNull(); + } else { + assertThat(tableSchema.findField("ts").type()).isInstanceOf(TimestampType.class); + assertThat(tableSchema.findField("op").type()).isInstanceOf(StringType.class); + } + } + + private void runTest(String branch, boolean useSchema, Map extraConfig) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + extraConfig.forEach(connectorConfig::config); + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); + + Instant threeDaysAgo = Instant.now().minus(Duration.ofDays(3)); + TestEvent event2 = new TestEvent(2, "type2", threeDaysAgo, "having fun?"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + try { + Table table = catalog().loadTable(TABLE_IDENTIFIER); + assertThat(table.snapshots()).hasSize(1); + } catch (NoSuchTableException e) { + fail("Table should exist"); + } + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java new file mode 100644 index 000000000000..f90d4da0379e --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class IntegrationTestBase { + + private final TestContext context = TestContext.INSTANCE; + private Catalog catalog; + private Admin admin; + private String connectorName; + private String testTopic; + + private KafkaProducer producer; + + protected static final int TEST_TOPIC_PARTITIONS = 2; + + protected TestContext context() { + return context; + } + + protected Catalog catalog() { + return catalog; + } + + protected String connectorName() { + return connectorName; + } + + protected String testTopic() { + return testTopic; + } + + @BeforeEach + public void baseBefore() { + catalog = context.initLocalCatalog(); + producer = context.initLocalProducer(); + admin = context.initLocalAdmin(); + + this.connectorName = "test_connector-" + UUID.randomUUID(); + this.testTopic = "test-topic-" + UUID.randomUUID(); + } + + @AfterEach + public void baseAfter() { + try { + if (catalog instanceof AutoCloseable) { + ((AutoCloseable) catalog).close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + producer.close(); + admin.close(); + } + + protected void assertSnapshotProps(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + Map props = latestSnapshot(table, branch).summary(); + assertThat(props) + .hasKeySatisfying( + new Condition() { + @Override + public boolean matches(String str) { + return str.startsWith("kafka.connect.offsets."); + } + }); + assertThat(props).containsKey("kafka.connect.commit-id"); + } + + protected List dataFiles(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + return Lists.newArrayList(latestSnapshot(table, branch).addedDataFiles(table.io())); + } + + protected List deleteFiles(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + return Lists.newArrayList(latestSnapshot(table, branch).addedDeleteFiles(table.io())); + } + + private Snapshot latestSnapshot(Table table, String branch) { + return branch == null ? table.currentSnapshot() : table.snapshot(branch); + } + + protected void createTopic(String topicName, int partitions) { + try { + admin + .createTopics(ImmutableList.of(new NewTopic(topicName, partitions, (short) 1))) + .all() + .get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + protected void deleteTopic(String topicName) { + try { + admin.deleteTopics(ImmutableList.of(topicName)).all().get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + protected void send(String topicName, TestEvent event, boolean useSchema) { + String eventStr = event.serialize(useSchema); + try { + producer.send(new ProducerRecord<>(topicName, Long.toString(event.id()), eventStr)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + protected void flush() { + producer.flush(); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java new file mode 100644 index 000000000000..098ab2395b34 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.classic.methods.HttpDelete; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.awaitility.Awaitility; + +public class KafkaConnectUtils { + + private static final HttpClient HTTP = HttpClients.createDefault(); + + // JavaBean-style for serialization + public static class Config { + + private final String name; + private final Map config = Maps.newHashMap(); + + public Config(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public Map getConfig() { + return config; + } + + public Config config(String key, Object value) { + config.put(key, value); + return this; + } + } + + public static void startConnector(Config config) { + try { + HttpPost request = + new HttpPost(String.format("http://localhost:%d/connectors", TestContext.CONNECT_PORT)); + String body = TestContext.MAPPER.writeValueAsString(config); + request.setHeader("Content-Type", "application/json"); + request.setEntity(new StringEntity(body)); + HTTP.execute(request, response -> null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void ensureConnectorRunning(String name) { + HttpGet request = + new HttpGet( + String.format( + "http://localhost:%d/connectors/%s/status", TestContext.CONNECT_PORT, name)); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .until( + () -> + HTTP.execute( + request, + response -> { + if (response.getCode() == HttpStatus.SC_OK) { + JsonNode root = + TestContext.MAPPER.readTree(response.getEntity().getContent()); + String connectorState = root.get("connector").get("state").asText(); + ArrayNode taskNodes = (ArrayNode) root.get("tasks"); + List taskStates = Lists.newArrayList(); + taskNodes.forEach(node -> taskStates.add(node.get("state").asText())); + return "RUNNING".equals(connectorState) + && taskStates.stream().allMatch("RUNNING"::equals); + } + return false; + })); + } + + public static void stopConnector(String name) { + try { + HttpDelete request = + new HttpDelete( + String.format("http://localhost:%d/connectors/%s", TestContext.CONNECT_PORT, name)); + HTTP.execute(request, response -> null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private KafkaConnectUtils() {} +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java new file mode 100644 index 000000000000..729d4bb264e5 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class TestContext { + + public static final TestContext INSTANCE = new TestContext(); + public static final ObjectMapper MAPPER = new ObjectMapper(); + public static final int CONNECT_PORT = 8083; + + private static final int MINIO_PORT = 9000; + private static final int CATALOG_PORT = 8181; + private static final String BOOTSTRAP_SERVERS = "localhost:29092"; + private static final String AWS_ACCESS_KEY = "minioadmin"; + private static final String AWS_SECRET_KEY = "minioadmin"; + private static final String AWS_REGION = "us-east-1"; + + private TestContext() { + ComposeContainer container = + new ComposeContainer(new File("./docker/docker-compose.yml")) + .waitingFor("connect", Wait.forHttp("/connectors")); + container.start(); + } + + public void startConnector(KafkaConnectUtils.Config config) { + KafkaConnectUtils.startConnector(config); + KafkaConnectUtils.ensureConnectorRunning(config.getName()); + } + + public void stopConnector(String name) { + KafkaConnectUtils.stopConnector(name); + } + + public Catalog initLocalCatalog() { + String localCatalogUri = "http://localhost:" + CATALOG_PORT; + RESTCatalog result = new RESTCatalog(); + result.initialize( + "local", + ImmutableMap.builder() + .put(CatalogProperties.URI, localCatalogUri) + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO") + .put("s3.endpoint", "http://localhost:" + MINIO_PORT) + .put("s3.access-key-id", AWS_ACCESS_KEY) + .put("s3.secret-access-key", AWS_SECRET_KEY) + .put("s3.path-style-access", "true") + .put("client.region", AWS_REGION) + .build()); + return result; + } + + public Map connectorCatalogProperties() { + return ImmutableMap.builder() + .put( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) + .put("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:" + CATALOG_PORT) + .put( + "iceberg.catalog." + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.aws.s3.S3FileIO") + .put("iceberg.catalog.s3.endpoint", "http://minio:" + MINIO_PORT) + .put("iceberg.catalog.s3.access-key-id", AWS_ACCESS_KEY) + .put("iceberg.catalog.s3.secret-access-key", AWS_SECRET_KEY) + .put("iceberg.catalog.s3.path-style-access", true) + .put("iceberg.catalog.client.region", AWS_REGION) + .build(); + } + + public KafkaProducer initLocalProducer() { + return new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + BOOTSTRAP_SERVERS, + ProducerConfig.CLIENT_ID_CONFIG, + UUID.randomUUID().toString()), + new StringSerializer(), + new StringSerializer()); + } + + public Admin initLocalAdmin() { + return Admin.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java new file mode 100644 index 000000000000..27de3885a4f9 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Instant; +import java.util.Date; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; + +public class TestEvent { + + public static final Schema TEST_SCHEMA = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "type", Types.StringType.get()), + Types.NestedField.required(3, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(4, "payload", Types.StringType.get())), + ImmutableSet.of(1)); + + public static final org.apache.kafka.connect.data.Schema TEST_CONNECT_SCHEMA = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) + .field("type", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .field("ts", Timestamp.SCHEMA) + .field("payload", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .field("op", org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA); + + public static final PartitionSpec TEST_SPEC = + PartitionSpec.builderFor(TEST_SCHEMA).day("ts").build(); + + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); + + static { + JSON_CONVERTER.configure( + ImmutableMap.of(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); + } + + private final long id; + private final String type; + private final Instant ts; + private final String payload; + private final String op; + + public TestEvent(long id, String type, Instant ts, String payload) { + this(id, type, ts, payload, null); + } + + public TestEvent(long id, String type, Instant ts, String payload, String op) { + this.id = id; + this.type = type; + this.ts = ts; + this.payload = payload; + this.op = op; + } + + public long id() { + return id; + } + + protected String serialize(boolean useSchema) { + try { + Struct value = + new Struct(TEST_CONNECT_SCHEMA) + .put("id", id) + .put("type", type) + .put("ts", Date.from(ts)) + .put("payload", payload) + .put("op", op); + + String convertMethod = + useSchema ? "convertToJsonWithEnvelope" : "convertToJsonWithoutEnvelope"; + JsonNode json = + DynMethods.builder(convertMethod) + .hiddenImpl( + JsonConverter.class, org.apache.kafka.connect.data.Schema.class, Object.class) + .build(JSON_CONVERTER) + .invoke(TestEvent.TEST_CONNECT_SCHEMA, value); + return TestContext.MAPPER.writeValueAsString(json); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png b/kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png new file mode 100644 index 000000000000..e4a99c3951e6 Binary files /dev/null and b/kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png differ diff --git a/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json b/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json new file mode 100644 index 000000000000..5b51e5dea875 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json @@ -0,0 +1,47 @@ +{ + "title": "Apache Iceberg Sink Connector", + "name": "iceberg-kafka-connect", + "logo": "assets/iceberg.png", + "version": "__VERSION__", + "description": "The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.", + + "component_types": [ + "sink" + ], + + "requirements": [], + + "features": { + "confluent_control_center_integration": true, + "delivery_guarantee": [ + "exactly_once" + ], + "kafka_connect_api": true, + "single_message_transforms": true, + "supported_encodings": [ + "any" + ] + }, + + "license": [ + { + "name": "Apache License, Version 2.0", + "url": "https://www.apache.org/licenses/LICENSE-2.0" + } + ], + + "owner": { + "name": "Apache Software Foundation", + "logo": "assets/iceberg.png", + "type": "organization", + "url": "https://iceberg.apache.org", + "username": "iceberg" + }, + + "support": { + "provider_name": "Iceberg OSS Community", + "logo": "assets/iceberg.png", + "summary": "Support for this connector is provided by the Iceberg open source community. You can open an issue in the Iceberg GitHub repo or post a question in the Iceberg Slack workspace in the #kafka-connect channel.", + "url": "https://github.com/apache/iceberg/issues" + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index aed11ab0b169..cf34b0bcd4c8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -62,7 +62,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; private static final String TABLE_PROP_PREFIX = "iceberg.table."; private static final String AUTO_CREATE_PROP_PREFIX = "iceberg.tables.auto-create-props."; - private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props."; + private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; private static final String TABLES_PROP = "iceberg.tables"; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java index 460b18fd7fc2..bb9370d3d563 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java @@ -83,14 +83,16 @@ private void close() { @Override public void put(Collection sinkRecords) { - Preconditions.checkNotNull(committer, "Committer wasn't initialized"); - committer.save(sinkRecords); + if (committer != null) { + committer.save(sinkRecords); + } } @Override public void flush(Map currentOffsets) { - Preconditions.checkNotNull(committer, "Committer wasn't initialized"); - committer.save(null); + if (committer != null) { + committer.save(null); + } } @Override diff --git a/settings.gradle b/settings.gradle index 46c85fb65546..cdc69b0e2071 100644 --- a/settings.gradle +++ b/settings.gradle @@ -198,3 +198,7 @@ project(":iceberg-kafka-connect:kafka-connect-events").name = "iceberg-kafka-con include ":iceberg-kafka-connect:kafka-connect" project(":iceberg-kafka-connect:kafka-connect").projectDir = file('kafka-connect/kafka-connect') project(":iceberg-kafka-connect:kafka-connect").name = "iceberg-kafka-connect" + +include ":iceberg-kafka-connect:kafka-connect-runtime" +project(":iceberg-kafka-connect:kafka-connect-runtime").projectDir = file('kafka-connect/kafka-connect-runtime') +project(":iceberg-kafka-connect:kafka-connect-runtime").name = "iceberg-kafka-connect-runtime"