From 30e761ebe420cf56f4d985664e6fa6d2a76de171 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Mon, 29 Jul 2024 07:55:45 -0700 Subject: [PATCH] Kafka Connect: Runtime distribution with integration tests (#10739) --- gradle/libs.versions.toml | 5 +- kafka-connect/build.gradle | 184 +++++++++++++- .../docker/docker-compose.yml | 107 +++++++++ .../connect/IntegrationDynamicTableTest.java | 135 +++++++++++ .../connect/IntegrationMultiTableTest.java | 140 +++++++++++ .../iceberg/connect/IntegrationTest.java | 225 ++++++++++++++++++ .../iceberg/connect/IntegrationTestBase.java | 155 ++++++++++++ .../iceberg/connect/KafkaConnectUtils.java | 117 +++++++++ .../apache/iceberg/connect/TestContext.java | 116 +++++++++ .../org/apache/iceberg/connect/TestEvent.java | 113 +++++++++ .../src/main/resources/iceberg.png | Bin 0 -> 17608 bytes .../src/main/resources/manifest.json | 47 ++++ .../iceberg/connect/IcebergSinkConfig.java | 2 +- .../iceberg/connect/IcebergSinkTask.java | 10 +- settings.gradle | 4 + 15 files changed, 1350 insertions(+), 10 deletions(-) create mode 100644 kafka-connect/kafka-connect-runtime/docker/docker-compose.yml create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java create mode 100644 kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java create mode 100644 kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png create mode 100644 kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c98cc323c9fe..055475809296 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,7 +45,7 @@ flink119 = { strictly = "1.19.0"} google-libraries-bom = "26.43.0" guava = "33.2.1-jre" hadoop2 = "2.7.3" -hadoop3-client = "3.3.6" +hadoop3 = "3.3.6" httpcomponents-httpclient5 = "5.3.1" hive2 = { strictly = "2.3.9"} # see rich version usage explanation above hive3 = "3.1.3" @@ -132,7 +132,8 @@ hadoop2-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "ha hadoop2-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop2" } hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop2" } hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" } -hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3-client" } +hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3" } +hadoop3-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop3" } hive2-exec = { module = "org.apache.hive:hive-exec", version.ref = "hive2" } hive2-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive2" } hive2-serde = { module = "org.apache.hive:hive-serde", version.ref = "hive2" } diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 1fdd6bc6ea4c..60fa879d291f 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -17,7 +17,7 @@ * under the License. */ -project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { +project(':iceberg-kafka-connect:iceberg-kafka-connect-events') { dependencies { api project(':iceberg-api') implementation project(':iceberg-core') @@ -28,10 +28,10 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { test { useJUnitPlatform() - } + } } -project(":iceberg-kafka-connect:iceberg-kafka-connect") { +project(':iceberg-kafka-connect:iceberg-kafka-connect') { dependencies { api project(':iceberg-api') implementation project(':iceberg-core') @@ -57,3 +57,181 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect") { useJUnitPlatform() } } + +project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { + apply plugin: 'distribution' + + configurations { + hive { + extendsFrom runtimeClasspath + } + all { + exclude group: 'javax.activation', module: 'activation' + // force upgrades for dependencies with known vulnerabilities... + resolutionStrategy { + force 'org.codehaus.jettison:jettison:1.5.4' + force 'org.xerial.snappy:snappy-java:1.1.10.5' + force 'org.apache.commons:commons-compress:1.26.0' + force 'org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0' + } + } + } + + sourceSets { + integration { + java.srcDir "$projectDir/src/integration/java" + resources.srcDir "$projectDir/src/integration/resources" + } + } + + configurations { + integrationImplementation.extendsFrom testImplementation + integrationRuntime.extendsFrom testRuntimeOnly + } + + dependencies { + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect') + implementation(libs.hadoop3.common) { + exclude group: 'log4j' + exclude group: 'org.slf4j' + exclude group: 'ch.qos.reload4j' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'com.fasterxml.woodstox' + exclude group: 'com.google.guava' + exclude group: 'com.google.protobuf' + exclude group: 'org.apache.curator' + exclude group: 'org.apache.zookeeper' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.hadoop', module: 'hadoop-auth' + exclude group: 'org.apache.commons', module: 'commons-configuration2' + exclude group: 'org.apache.hadoop.thirdparty', module: 'hadoop-shaded-protobuf_3_7' + exclude group: 'org.codehaus.woodstox' + exclude group: 'org.eclipse.jetty' + } + implementation project(':iceberg-orc') + implementation project(':iceberg-parquet') + + implementation project(':iceberg-aws') + implementation platform(libs.awssdk.bom) + implementation 'software.amazon.awssdk:apache-client' + implementation 'software.amazon.awssdk:auth' + implementation 'software.amazon.awssdk:iam' + implementation 'software.amazon.awssdk:sso' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:kms' + implementation 'software.amazon.awssdk:glue' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:dynamodb' + implementation 'software.amazon.awssdk:lakeformation' + + implementation project(':iceberg-gcp') + implementation platform(libs.google.libraries.bom) + implementation 'com.google.cloud:google-cloud-storage' + + implementation project(':iceberg-azure') + implementation platform(libs.azuresdk.bom) + implementation 'com.azure:azure-storage-file-datalake' + implementation 'com.azure:azure-identity' + + hive project(':iceberg-hive-metastore') + hive(libs.hive2.metastore) { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-applicationhistoryservice' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-web-proxy' + exclude group: 'org.apache.hive', module: 'hive-service-rpc' + exclude group: 'com.github.joshelser', module: 'dropwizard-metrics-hadoop-metrics2-reporter' + } + hive(libs.hadoop3.client) { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + + integrationImplementation project(':iceberg-api') + integrationImplementation project(':iceberg-common') + integrationImplementation project(':iceberg-core') + integrationImplementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + integrationImplementation platform(libs.jackson.bom) + integrationImplementation libs.jackson.core + integrationImplementation libs.jackson.databind + integrationImplementation libs.jackson.databind + integrationImplementation libs.kafka.clients + integrationImplementation libs.kafka.connect.api + integrationImplementation libs.kafka.connect.json + integrationImplementation libs.testcontainers + integrationImplementation libs.httpcomponents.httpclient5 + integrationImplementation libs.awaitility + } + + task integrationTest(type: Test) { + useJUnitPlatform() + testClassesDirs = sourceSets.integration.output.classesDirs + classpath = sourceSets.integration.runtimeClasspath + jvmArgs += project.property('extraJvmArgs') + } + + processResources { + filter { + it.replace('__VERSION__', project.version.toString()) + } + } + + distributions { + main { + contents { + from "${processResources.destinationDir}/manifest.json" + into('lib/') { + from configurations.runtimeClasspath + } + into('doc/') { + from "$rootDir/LICENSE" + } + into('assets/') { + from "${processResources.destinationDir}/iceberg.png" + } + } + } + hive { + contents { + from "${processResources.destinationDir}/manifest.json" + into('lib/') { + from configurations.hive + } + into('doc/') { + from "$rootDir/LICENSE" + } + into('assets/') { + from "${processResources.destinationDir}/iceberg.png" + } + } + } + } + + tasks.jar.enabled = false + + tasks.distTar.enabled = false + distZip.dependsOn processResources + installDist.dependsOn processResources + + tasks.hiveDistTar.enabled = false + hiveDistZip.dependsOn processResources + installHiveDist.dependsOn processResources + + integrationTest.dependsOn installDist + check.dependsOn integrationTest + + assemble.dependsOn distZip, hiveDistZip +} diff --git a/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml b/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml new file mode 100644 index 000000000000..202180289d96 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/docker/docker-compose.yml @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +volumes: + data: {} + +services: + minio: + image: minio/minio + hostname: minio + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + ports: + - 9000:9000 + - 9001:9001 + volumes: + - data:/data + command: server /data --console-address ":9001" + + create-bucket: + image: minio/mc + depends_on: + - minio + volumes: + - data:/data + entrypoint: mc mb /data/bucket + + iceberg: + image: tabulario/iceberg-rest + depends_on: + - create-bucket + hostname: iceberg + ports: + - 8181:8181 + environment: + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://bucket/warehouse/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + - CATALOG_S3_PATH__STYLE__ACCESS=true + - CATALOG_S3_ACCESS__KEY__ID=minioadmin + - CATALOG_S3_SECRET__ACCESS__KEY=minioadmin + + kafka: + image: confluentinc/cp-kafka + hostname: kafka + ports: + - 29092:29092 + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENERS: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: E-JXLvCaTiaUYDb1LwZ1JQ + + connect: + image: confluentinc/cp-kafka-connect + depends_on: + - kafka + hostname: connect + ports: + - 8083:8083 + volumes: + - ../build/install:/test/kafka-connect + environment: + CONNECT_REST_ADVERTISED_HOST_NAME: localhost + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: kc + CONNECT_CONFIG_STORAGE_TOPIC: kc-config + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: kc-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: kc-storage + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false + CONNECT_PLUGIN_PATH: /test/kafka-connect + CONNECT_BOOTSTRAP_SERVERS: kafka:9092 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 500 diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java new file mode 100644 index 000000000000..5c458ad3fa78 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationDynamicTableTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE1 = "tbl1"; + private static final String TEST_TABLE2 = "tbl2"; + private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); + private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSink(String branch) { + // partitioned table + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + // unpartitioned table + catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema); + + List files = dataFiles(TABLE_IDENTIFIER1, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); + + files = dataFiles(TABLE_IDENTIFIER2, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER2, branch); + } + + private void runTest(String branch, boolean useSchema) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.tables.dynamic-enabled", true) + .config("iceberg.tables.route-field", "payload") + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + if (!useSchema) { + connectorConfig.config("value.converter.schemas.enable", false); + } + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1); + TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2); + TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + send(testTopic(), event3, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + Table table = catalog().loadTable(TABLE_IDENTIFIER1); + assertThat(table.snapshots()).hasSize(1); + table = catalog().loadTable(TABLE_IDENTIFIER2); + assertThat(table.snapshots()).hasSize(1); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java new file mode 100644 index 000000000000..7cffbd8838b2 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationMultiTableTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE1 = "foobar1"; + private static final String TEST_TABLE2 = "foobar2"; + private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); + private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSink(String branch) { + // partitioned table + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + // unpartitioned table + catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema); + + List files = dataFiles(TABLE_IDENTIFIER1, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); + + files = dataFiles(TABLE_IDENTIFIER2, branch); + assertThat(files).hasSize(1); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER2, branch); + } + + private void runTest(String branch, boolean useSchema) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config( + "iceberg.tables", + String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) + .config("iceberg.tables.route-field", "type") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2") + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + // use a schema for one of the cases + if (!useSchema) { + connectorConfig.config("value.converter.schemas.enable", false); + } + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); + TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?"); + TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + send(testTopic(), event3, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + Table table = catalog().loadTable(TABLE_IDENTIFIER1); + assertThat(table.snapshots()).hasSize(1); + table = catalog().loadTable(TABLE_IDENTIFIER2); + assertThat(table.snapshots()).hasSize(1); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java new file mode 100644 index 000000000000..80a74539311c --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.TimestampType; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class IntegrationTest extends IntegrationTestBase { + + private static final String TEST_DB = "test"; + private static final String TEST_TABLE = "foobar"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TEST_DB, TEST_TABLE); + + @BeforeEach + public void before() { + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); + } + + @AfterEach + public void after() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkPartitionedTable(String branch) { + catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of()); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // partition may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.get(0).recordCount()).isEqualTo(1); + assertThat(files.get(1).recordCount()).isEqualTo(1); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkUnpartitionedTable(String branch) { + catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of()); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkSchemaEvolution(String branch) { + Schema initialSchema = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "type", Types.StringType.get()))); + catalog().createTable(TABLE_IDENTIFIER, initialSchema); + + boolean useSchema = branch == null; // use a schema for one of the tests + runTest(branch, useSchema, ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true")); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + + // when not using a value schema, the ID data type will not be updated + Class expectedIdType = + useSchema ? Types.LongType.class : Types.IntegerType.class; + + assertGeneratedSchema(useSchema, expectedIdType); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = "test_branch") + public void testIcebergSinkAutoCreate(String branch) { + boolean useSchema = branch == null; // use a schema for one of the tests + + Map extraConfig = Maps.newHashMap(); + extraConfig.put("iceberg.tables.auto-create-enabled", "true"); + if (useSchema) { + // partition the table for one of the tests + extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)"); + } + + runTest(branch, useSchema, extraConfig); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + + assertGeneratedSchema(useSchema, LongType.class); + + PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER).spec(); + assertThat(spec.isPartitioned()).isEqualTo(useSchema); + } + + private void assertGeneratedSchema(boolean useSchema, Class expectedIdType) { + Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER).schema(); + assertThat(tableSchema.findField("id").type()).isInstanceOf(expectedIdType); + assertThat(tableSchema.findField("type").type()).isInstanceOf(StringType.class); + assertThat(tableSchema.findField("payload").type()).isInstanceOf(StringType.class); + + if (!useSchema) { + // without a schema we can only map the primitive type + assertThat(tableSchema.findField("ts").type()).isInstanceOf(LongType.class); + // null values should be ignored when not using a value schema + assertThat(tableSchema.findField("op")).isNull(); + } else { + assertThat(tableSchema.findField("ts").type()).isInstanceOf(TimestampType.class); + assertThat(tableSchema.findField("op").type()).isInstanceOf(StringType.class); + } + } + + private void runTest(String branch, boolean useSchema, Map extraConfig) { + // set offset reset to earliest so we don't miss any test messages + KafkaConnectUtils.Config connectorConfig = + new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + extraConfig.forEach(connectorConfig::config); + + context().startConnector(connectorConfig); + + TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); + + Instant threeDaysAgo = Instant.now().minus(Duration.ofDays(3)); + TestEvent event2 = new TestEvent(2, "type2", threeDaysAgo, "having fun?"); + + send(testTopic(), event1, useSchema); + send(testTopic(), event2, useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(this::assertSnapshotAdded); + } + + private void assertSnapshotAdded() { + try { + Table table = catalog().loadTable(TABLE_IDENTIFIER); + assertThat(table.snapshots()).hasSize(1); + } catch (NoSuchTableException e) { + fail("Table should exist"); + } + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java new file mode 100644 index 000000000000..f90d4da0379e --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class IntegrationTestBase { + + private final TestContext context = TestContext.INSTANCE; + private Catalog catalog; + private Admin admin; + private String connectorName; + private String testTopic; + + private KafkaProducer producer; + + protected static final int TEST_TOPIC_PARTITIONS = 2; + + protected TestContext context() { + return context; + } + + protected Catalog catalog() { + return catalog; + } + + protected String connectorName() { + return connectorName; + } + + protected String testTopic() { + return testTopic; + } + + @BeforeEach + public void baseBefore() { + catalog = context.initLocalCatalog(); + producer = context.initLocalProducer(); + admin = context.initLocalAdmin(); + + this.connectorName = "test_connector-" + UUID.randomUUID(); + this.testTopic = "test-topic-" + UUID.randomUUID(); + } + + @AfterEach + public void baseAfter() { + try { + if (catalog instanceof AutoCloseable) { + ((AutoCloseable) catalog).close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + producer.close(); + admin.close(); + } + + protected void assertSnapshotProps(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + Map props = latestSnapshot(table, branch).summary(); + assertThat(props) + .hasKeySatisfying( + new Condition() { + @Override + public boolean matches(String str) { + return str.startsWith("kafka.connect.offsets."); + } + }); + assertThat(props).containsKey("kafka.connect.commit-id"); + } + + protected List dataFiles(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + return Lists.newArrayList(latestSnapshot(table, branch).addedDataFiles(table.io())); + } + + protected List deleteFiles(TableIdentifier tableIdentifier, String branch) { + Table table = catalog.loadTable(tableIdentifier); + return Lists.newArrayList(latestSnapshot(table, branch).addedDeleteFiles(table.io())); + } + + private Snapshot latestSnapshot(Table table, String branch) { + return branch == null ? table.currentSnapshot() : table.snapshot(branch); + } + + protected void createTopic(String topicName, int partitions) { + try { + admin + .createTopics(ImmutableList.of(new NewTopic(topicName, partitions, (short) 1))) + .all() + .get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + protected void deleteTopic(String topicName) { + try { + admin.deleteTopics(ImmutableList.of(topicName)).all().get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + protected void send(String topicName, TestEvent event, boolean useSchema) { + String eventStr = event.serialize(useSchema); + try { + producer.send(new ProducerRecord<>(topicName, Long.toString(event.id()), eventStr)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + protected void flush() { + producer.flush(); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java new file mode 100644 index 000000000000..098ab2395b34 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/KafkaConnectUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.classic.methods.HttpDelete; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.awaitility.Awaitility; + +public class KafkaConnectUtils { + + private static final HttpClient HTTP = HttpClients.createDefault(); + + // JavaBean-style for serialization + public static class Config { + + private final String name; + private final Map config = Maps.newHashMap(); + + public Config(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public Map getConfig() { + return config; + } + + public Config config(String key, Object value) { + config.put(key, value); + return this; + } + } + + public static void startConnector(Config config) { + try { + HttpPost request = + new HttpPost(String.format("http://localhost:%d/connectors", TestContext.CONNECT_PORT)); + String body = TestContext.MAPPER.writeValueAsString(config); + request.setHeader("Content-Type", "application/json"); + request.setEntity(new StringEntity(body)); + HTTP.execute(request, response -> null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void ensureConnectorRunning(String name) { + HttpGet request = + new HttpGet( + String.format( + "http://localhost:%d/connectors/%s/status", TestContext.CONNECT_PORT, name)); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .until( + () -> + HTTP.execute( + request, + response -> { + if (response.getCode() == HttpStatus.SC_OK) { + JsonNode root = + TestContext.MAPPER.readTree(response.getEntity().getContent()); + String connectorState = root.get("connector").get("state").asText(); + ArrayNode taskNodes = (ArrayNode) root.get("tasks"); + List taskStates = Lists.newArrayList(); + taskNodes.forEach(node -> taskStates.add(node.get("state").asText())); + return "RUNNING".equals(connectorState) + && taskStates.stream().allMatch("RUNNING"::equals); + } + return false; + })); + } + + public static void stopConnector(String name) { + try { + HttpDelete request = + new HttpDelete( + String.format("http://localhost:%d/connectors/%s", TestContext.CONNECT_PORT, name)); + HTTP.execute(request, response -> null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private KafkaConnectUtils() {} +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java new file mode 100644 index 000000000000..729d4bb264e5 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class TestContext { + + public static final TestContext INSTANCE = new TestContext(); + public static final ObjectMapper MAPPER = new ObjectMapper(); + public static final int CONNECT_PORT = 8083; + + private static final int MINIO_PORT = 9000; + private static final int CATALOG_PORT = 8181; + private static final String BOOTSTRAP_SERVERS = "localhost:29092"; + private static final String AWS_ACCESS_KEY = "minioadmin"; + private static final String AWS_SECRET_KEY = "minioadmin"; + private static final String AWS_REGION = "us-east-1"; + + private TestContext() { + ComposeContainer container = + new ComposeContainer(new File("./docker/docker-compose.yml")) + .waitingFor("connect", Wait.forHttp("/connectors")); + container.start(); + } + + public void startConnector(KafkaConnectUtils.Config config) { + KafkaConnectUtils.startConnector(config); + KafkaConnectUtils.ensureConnectorRunning(config.getName()); + } + + public void stopConnector(String name) { + KafkaConnectUtils.stopConnector(name); + } + + public Catalog initLocalCatalog() { + String localCatalogUri = "http://localhost:" + CATALOG_PORT; + RESTCatalog result = new RESTCatalog(); + result.initialize( + "local", + ImmutableMap.builder() + .put(CatalogProperties.URI, localCatalogUri) + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO") + .put("s3.endpoint", "http://localhost:" + MINIO_PORT) + .put("s3.access-key-id", AWS_ACCESS_KEY) + .put("s3.secret-access-key", AWS_SECRET_KEY) + .put("s3.path-style-access", "true") + .put("client.region", AWS_REGION) + .build()); + return result; + } + + public Map connectorCatalogProperties() { + return ImmutableMap.builder() + .put( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) + .put("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:" + CATALOG_PORT) + .put( + "iceberg.catalog." + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.aws.s3.S3FileIO") + .put("iceberg.catalog.s3.endpoint", "http://minio:" + MINIO_PORT) + .put("iceberg.catalog.s3.access-key-id", AWS_ACCESS_KEY) + .put("iceberg.catalog.s3.secret-access-key", AWS_SECRET_KEY) + .put("iceberg.catalog.s3.path-style-access", true) + .put("iceberg.catalog.client.region", AWS_REGION) + .build(); + } + + public KafkaProducer initLocalProducer() { + return new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + BOOTSTRAP_SERVERS, + ProducerConfig.CLIENT_ID_CONFIG, + UUID.randomUUID().toString()), + new StringSerializer(), + new StringSerializer()); + } + + public Admin initLocalAdmin() { + return Admin.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)); + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java new file mode 100644 index 000000000000..27de3885a4f9 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestEvent.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Instant; +import java.util.Date; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; + +public class TestEvent { + + public static final Schema TEST_SCHEMA = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "type", Types.StringType.get()), + Types.NestedField.required(3, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(4, "payload", Types.StringType.get())), + ImmutableSet.of(1)); + + public static final org.apache.kafka.connect.data.Schema TEST_CONNECT_SCHEMA = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) + .field("type", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .field("ts", Timestamp.SCHEMA) + .field("payload", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .field("op", org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA); + + public static final PartitionSpec TEST_SPEC = + PartitionSpec.builderFor(TEST_SCHEMA).day("ts").build(); + + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); + + static { + JSON_CONVERTER.configure( + ImmutableMap.of(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); + } + + private final long id; + private final String type; + private final Instant ts; + private final String payload; + private final String op; + + public TestEvent(long id, String type, Instant ts, String payload) { + this(id, type, ts, payload, null); + } + + public TestEvent(long id, String type, Instant ts, String payload, String op) { + this.id = id; + this.type = type; + this.ts = ts; + this.payload = payload; + this.op = op; + } + + public long id() { + return id; + } + + protected String serialize(boolean useSchema) { + try { + Struct value = + new Struct(TEST_CONNECT_SCHEMA) + .put("id", id) + .put("type", type) + .put("ts", Date.from(ts)) + .put("payload", payload) + .put("op", op); + + String convertMethod = + useSchema ? "convertToJsonWithEnvelope" : "convertToJsonWithoutEnvelope"; + JsonNode json = + DynMethods.builder(convertMethod) + .hiddenImpl( + JsonConverter.class, org.apache.kafka.connect.data.Schema.class, Object.class) + .build(JSON_CONVERTER) + .invoke(TestEvent.TEST_CONNECT_SCHEMA, value); + return TestContext.MAPPER.writeValueAsString(json); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png b/kafka-connect/kafka-connect-runtime/src/main/resources/iceberg.png new file mode 100644 index 0000000000000000000000000000000000000000..e4a99c3951e6aa99d6e44575f236f32aeb5710eb GIT binary patch literal 17608 zcmbXKcR1Dm`v8ugWAA+sSqbGFl*q`IGD68X_90}?j0h*QC`5?t%yW+8WN!!ADI+Un zWJN-e6~2%2dVj9#`}^nj&)0Qzb)Cn!_kBOseLtVyHMq@4$3q8!Kp1tjZyG@$WJuCK z8Yhxv?{CDn=7V^=3FQ-R=cD{}fbq6ndM?oDAyN8ZOj&=@@efk`gAds7`Iycpg zgTDTpolbl`k+HR7vRa(aTpw}_`CIQT$}8D@{hotMK}N4*wUiS+;tc)jZJXdq8t+?P zPY<^>&4N8@SJ$EN%J&n2$Y^i?q-omv5ZU$ex_V$syJ2Y zqPw+$iqQ!fz@S8bpK*`C=8yE4C8zw8sdI27O>47@hEgCP2~n3HH&3z_qv}lAY85rj z;vJzRl4$S8rd-(*x9i^+8hb%zmLfuG-Hk2Vxp+tCS+KRTQcJA2_PlFA6rNL$jD-N% z<($`*TIz0Z#N?N|suHb3UjPBlc7`0yZ5^bJ<-V9!zS?yK4M0 zFbYEjk090*^iao*J_Fd+t|4W=bV{h*5{DSNVggLuGK+vphynQ+{?IqN5hmF%Xk+qU zw@@PI1vMg>Vx&nUWzg#}2-O8Bd{!gtfdXjkpf{=9+^;RLpnRidLQjDZB%sE$HO4rw zP=|uDwJ|a%{q}XFPTo<2eKT27dKyLf+K$R}Ea1vBR=gGpy`g_QG37-tiE}to=S}eK ztM*WW^)*@x{WLNP1o0bc?C!1^6i+2_R1J_J7)*`cyApWNqZSzJ;S0`B(UdGW2un5K zkH!=-BUr-d=zUp?S)E7*e$rR~EEI5vr}O1nrvIO&S_*hZ?J0jdof?<=Mi57}K@|rW zXi1H2bTz{jBN-ve&f-*Q5ok&!R@?~Ok?hHLNrM4D@f8WN_SpB z&chB9O&>)T^bL6rfD^}LYE+Y~g1u}?j*k3<`CGeSo<;fiHZ10`Gu9V*B_? zZEtQp+^Xc@=71p>cyNWTBGXTAn=?ZAT`p(Ns}nU07UJNZkb?wxpc+xbsXYz@Lnl>p zj3IWP@gsJ5@FO+=4*KqwcRgaePWfEMgdqONIhU&(mole6+2nRRMz(CR~#P z>@l$69q?n*I0}eTe%aSC9H$fTj-i1>IV*qE{`STp?<6g%Ki*+oK3dD18)sitR~aDg@$DoRZg- zCIqF2Qo(B}pv3dF475m^D^@&DD-?KkkFQQn2n}9 z1{xmz$OOS3Uuh8FOn4AKl;|y@y7Q!07lM{6KHKbi^#DN*r9X_u2ExQBp~Tx+S1zK2 zO1wo(fEkcMnnm=tyjUt}kiDYFt1(bW2c3D3yqz{N;?TQ`n+ifh|KL0gQqT$tp)t36 za3}J{0>+@844v1d>Tr9GIiZr{@4JZQTlI=rQCJ^9eBM`O^HCByAP9(1Jem?%6gr8L zjB|kwqSg!i`u|URyth_88H7Z}|4<2{Sn&`@6oQ37148-#+fMSO|Fv5&ntj))SN!M- zfk^M(;;767SmV7ZvC7x1OpgQ2zU1h>>VnABHxddYKUb8F?J;t8z989=Bamwp9O+nU zeD`?Y_{+7XevH-90QHc5$5?cOi2rdrPrQ2X1ynv3y*IGddC#;%pIuXA@29uODGjl? zX~1p0<7aX(;g;;c-CjkOXq*D@`Q_mHqbo;Ox^(Nc#Ep&0lH*d1@el_+jJX+?>Dcn2 zBd4Zn8LEW@wc$MeTf9p1pl-cpFs86h7h~d1PIlN-SBXDWtm6o$AW4kY?EY{pvpKDt z_UKX7@B>``bwQMH=KRYy31s6})1IQ2jp~!bu(p+K0V(Be4`bl2XP%kkP%dr5@vbJB zc6K1b{1`aZa|_RA)1xq67Ier(l+I8Atqm{LD>ed-*^CX-dckTcw%DI2RcUg`GN#g> zM&jEGjd2$M+tIDwemvFD*@F7tE%xT?9|6hcyo5LYj;xvEPYFX566<;(Su4c;P^{rUwY8sGf-;V`oa8_h##(4Rw zZvcKM!-aCEVTw6+15P%AG_#r2eAd%y7wZn#G#WxBqHHcbV(0l>^nib8*MQiMu+kwE zl+6mdSfuCYvEd+UaxM{VER+%$bL$&?lsGwaDzdabZ>{@HLNev#y=?m1%I|(;qa6`e zFU1xxpIp~8M-|aZBxFyz=@rq2MJLt3rutsFEqp^=uDwl(ZSodrUuQS9 z_a3`sd@}ms3OzLF*T~Q0s_^~gOI8wipMGy@F_PN`n#YvG%p>?ZO)|&x+x?ob2r>xP zB#sipd?Q5cvI-hKCP!;_$8|ytW5%xeQ{m`n51-kn;>knWlYo7=mp4Zvj{DFv&wSa=C7i(A+QoP}e6p z_O2%cxS|`@TSSUQ(r4VcP(1SZ%@-!3{utSuXvmdR4iLzLBckBCSv<;hO&u2?-urJI z@|C&hXTt>?u}SE?XKR!@we}YyO^quc2WW9qL-~uhh`|@Ft|qrb00~b7VYmyzyuriT zahwzgf@~=-U`D0vz5XONFuB!<%(OHa>hf`Gh{Q?m6?W`HqiUL?A>>d6cP-UmUN9w5 zZpLsjJ+F>VGrZ$0vi)*KU%M*g zbHuDT!|xk*$;DGVGUyxB&;$2p9vxx5rVAWUI71eX3{b)to9Zld+bsH+_IS_6nOr-P z9;p-Oxot6#2q>9@EhcHfu*~fqFseOqosnzl_juIy=+rK~c|KP6Zg_uz3OZo8b-9-6 zB_L6dqew$2EKjO^#stZxN@y_?~vx&pYHq%EZ+X8U>9M{t=z(i^=LJ;vuS_J^j>{3 zYR}JRV1j}cSiu1vlTD_Yu zt;-(9er?x1!pX}rEk%lttE~@rP7maTR-mr_>1X-Tq`0jV-Hsar^cZpAy89{Mf?grw zMfJ(itt~0lzgY-Ytjcy)EAB&X6+a)aN?#Nsvz;_~@peK42_^I(Uy6^Tn-MWV=LpHl z){=hLZ(_`3>K^WPXUpJ^gD$XacU>x>rS_L``5}Z-Q0sslo5h?zYQGqd`W;`H_vKpx z`%@V_c|eMU#+3L6wPvjSV=h8+%S_$l^2~eKv@dXK7f(4DZiM}*Av+FIGW1i0&^f%n zb(xIwvOE&7C(2|#bU5nO<#t~Wf)Tg}Q2Z@sJab8U(YFmK z$h>(?a8GsWe>+k4*U@pl+>GgjO)N5a8a9MpxXUv^Yl>E_SG*mcdu-J8TNEWCywBTw zeb7#(6YbTi^Zn>l2tZH~=cJ3mwF4LhV?8BwoGoTleKqpN6-SRdJ>>9y7g%V>r=u+H z9A0_yB_5rcrBwPSQ)aa$J@)}?u;9y$AJ5fXti+H|yVBIS9ddHN*5#63_NUq2Wqw}m z;vQD3Hw9TlF-Qw-*ualzWEL~BFIC>(CZ1~IO-kgAWcH`W3aoDiEvi}xGulNQ`w#3; z26dOeAiT(48BB+Y=8S@E}F75pz4?=W!B%h{z-Jg1`JVV<0F!wo^^gk z5n1$dBa4P&yZ*WLdAgAgQy1c@pzt!AldOpx1?l(M>n5v)MVsAw0=^emh)fQcraOT> z#H@1r2CMdhV8=IB@sR=F8)p~~$ayB9yQu2!I|%(OoV~(1La9wgPtm8u9GV$RAtZF| zB?Dj4lr_B=7Kt|{^)bDd(x(V1tIDEz-R%z~ImB9-7wu<)SS8~e!@4)~%>T`k!B50a zi;=llQRk6gdtdE?k6X5<)cbY5Fxc^~cDmv~t?lY_A`kf+sPo0aUm@I6dGYs6a2M!b z^tAZ)o_)ja-UJYU3eY+aWHHE08LP&3LuKf$?8p_MqLh$W1MTsNiD6^-sRe|E^HtJ4U;I{wHbTtks+K4pOf@AFr*j< zMkaVWB;$Ee3VRo4x;QfHyv)avZ`f*|Z1$%R+<}wjr7M-{x$pXJLdnMSlHX-8+T33` z@Gh%zKi=GwXVw0$2zh&D%9@wf%!{3{8x%j0VlHXBKJ)m(wo3#IqSrSYCDY{g-~^wx z2Zg&hyKdRU67$X^ng8QF6h1#=A zy*2VNe7^HWtGhSn|1)!M0bs@EmR*&~sH?~?S1jL7kN&5$ZT)iN7Xm|zrR9;g%R^&Q zDs8hT5)`i6tRwzkn;y8;UGO`UEHJ@ALB9jYn&42{M5mX@X4(!3unkwEJhaI3v?YsvCr=6Xd;#N z5{AH-Q8_*?tCGPF%%WBEeE1;OMM9A?hNXiXP&ay+y0?LA)F~Mhz>wy@(Y4b-Y8lOC zgWt=%$4t%E(s3^I3ua7u{6>|1%q^hZogVqIRl4^}(WjAm8v)N; znnD0OSlBWuq(1j9ms{p+IW2NH;=HL$9dA+9zHaS_BCw?TJt*YpOg+^@BO6F75jdB; z)sK2kG3EaQ4^b|R(5h;>ZtBkdm4mR>Cha^l%;54}QH<;aQ-RXQ#So7ApN%{BxX*sFxy||Rd!O_>^DeHGLfnDxpsFN2H5egOj>nBU7;0pwj zC-koH`sSu=Z)bnl4V#Ya-VHAE`}&`&^zx_7)|@l+jG>hS#|{SSNl62y z{@S_!i`!}{OP(DT&mT->E;ZgTWe9~nsU5A62Zop{iiDzJ&H)on;!`5@QYatU1ar&m z^LGp>`j-bl^nrxn$}6x%sW48B+!^Ux3SSIo0;S)d?5wBC;1h4Da)a$IB*5I<+C;AH ztx4e4oLQH(;pHxQHfx7;cg4gr?{Ah-UOO{yg(emY|Gow>qsWJJ@C%^Cx&?Y-+QN@z z7y4hkd+fo!V%n}-OU?XeW+NRKsTi3febP5^2w`AxgxvqnFZbP@uw%J`yC&y(gssle znTyAYo90+XG3NMP!=xtj@QIfxi;&J<@gL`G?*^{4g>d6WyhT#67le5j`W)X9vi^uM zu-g*p2iE^h`0Y^a>-q@e?O(ojcA`c%&se`n48b|*ZTm!jrCI35Ssk_b{b;wZFMYiF z>GKBxv+>l%QJ4{OVQm zTY_cKn#EYRToo{@9JxU=@ebt=r$hcu`i3}%uy~Nh2jQkQt=j+9y`US`lB(gzwIF_@ z(k(!7;0$%O$B-i?H0;;c; zfWb<1b~Bl2qU9Qiv;D3bM*qF`m`0B1@}_%37GiEtwYhe2{u;uSHV=Yp z_^Jd}!#^r|AZfq+QV7oXI zKvAjV4pMY+RAa!CDo+{iup8;mrYm#waD^edX{`%4}DrL}Xj zWaQ`jVkEt?VC2ik3`n*T5+l3u5c%WyS3eFO?^yZP^zSBn$9Q3!NC};r$+eK+krENJ zof-3~yTZ#FPx>B3kM=?k;Qq*?Wpxy79Rqi=HZOnG6<)YDEf!U-Q{On$(;H)>L%HE8 zZKx;Ob#9K4O=w#h6zYSu<3udLy7f~X)y>>aH|3ara=kM{Yjgq1C z2)ef=l#u4v{|MNPr|P|^lW;U?cip!7s?Ia}`QhEJV5_esdv%n+6dZ*4YPKh^)$2pmdt8{X_0+T5O4_;O%C*d|O7DJUxQ+xA`;rXwcl7XMB)CC4 zsz8V==oU9MsR5G~o!HUN<$S>3aou#8*YXlD2^WI2vTHt;1U^?Livf90+I@RwYVgZR zRB>Ub1y@Q4vl)#AdZK;Zo-S$rRrx&kC|*o|cE-Ni8b(P*amNd2(nOpw@y(j)T`{Yx0u8$Er;JYqRgD10+%6HQrwhlVR zm>4bqr~0NCwD{tcM@K<>o44}Ol~4Cqsmkmh|4G?yG`gm@sUmR*7)IqD9@F~}8kPJc z=GdR!2upVuH-XI1A>&3P4PIp+YZKkqQbC+Z*$Bc*`S3BLA|HL}kgM~YWF0&U97DNxGmSJ#$nw`^Q*^s$ld)$@jIN6qlz#)9@)9qFLmAwBl?C#@qTsh$GA zF4wKPFs29bK~AW!df^R)_!_*zvBH%NaiqT(Sz1BxG6INlv%C50{+*x_rzO1R<6h!O z%NSXxg<86k*C$NH+25MR$D75Yxzttoh~iLBF(|#^!S8=}lA!RK*H%5kp%xExOyX4} zP=8P&F!i)Yr+l@W==J~;VX|80`_}mVlCfR6ZckuI&__*t$ff3p!Hk|2k=q zlgToT1M|a4EB1^9e-!zFaz`WdmM-S)A`kg-izIJyNV2^{fx!3f^iq%Rq7pQBPr^1jPod-U15(75UmecT7P*Mw0$W!{jW=(zfxU#LjrZcE7viBus_s$ zJ%(r8rr@R!R-6ClV*YbFPGKG;;pr$LW)Qf)hgyuV*uIOUQJ{iXI3fM54N&8-#tTra zqct%uSzw`m6utKqejFj7gwcGMJN_*$ZcCNoj^1qv?Rq*%qzK7>){>Xr6#GN<-Yc(_ zOMRZZdv^4l3rVBDs8TMgz*wiA%lU#D?ggT1q`6t|<(@SrBx4S*s!r9ho7C2W?NX=g zKh=vN%;)|<7&Xq-44IJV41Twn$6ulNNQ)}5|52a;N2UE4cqF1SsZW*o4Xn!GU}yf~#c&Dix&q~;7L$<& zT7<7;RrtJEf6{*cX_EN(tK2W<&(4!fmtZVzvFmckhl>!X8FC~sFSC*{dYuwMp+S{+ zI*@3Ka)_i%OTNln7vJ~SF8Z-WtM+Y)k5<`2$jvBovmv)TJpu*SP_jV=9X`K0UIcG0 zuOeymNE-s@S<(u7ZzyxS*%144N6yFzxyMR!ik9l1y*qI?!=W4h7|GJ23k~73Dl;j@ z)lYto@?ej+2;4P84_g5p?kDPZzvu3W51eBH5!y2PV^`%w%h<~U_1v^sPn)K5`(~`T zBN||a5$DED2=ZFo+*FB*Y%xDZE7{(0QZbM^6fAz<8Ln+(nGuS-T`~LSq0sqPe_c*I zz)s5c>s>g5(&=0DjsYY~4N&y>^@8yD5H?z_BvJ*jdBM36G% z;AfxAaU_Yn-#Y~o(0GXsqRzV;0N&rA+;Qiwt&N}nI@6DYenAqp!?I~M0>u9*8Wwf; zxSG2rNs~13{4HX{bu+JzJ(0IF;$aCaN7YL=^RlQeA|1hcB8Upz(n@2~sHLSvF8ug8 znjDw*an!fPF#aBRU?Pc{jJ!_e2OZKshrSTHQDgS^YLLv`krLUe*X{!SEwLv102(PI zvEqD*IQV2I2J4HSOkedX4L{kTN8BV98oC8I3WtI1gY(qsMi8JWQMUmnUZnePgSyDB z`zn#QIXORDrm5}kZxyP<3**2wc|rxoeE2>Sx@{_|Ewl(D4@!+py6lhL!b>eoitLXm zZkAtYll1?0)R)4_<-~@f5^UTY1&goKjO~~n;VUM*tZL!y zsR)DuG09hH6P6H7R0A^8z1Ibk7*=pqkGN~0v;Na|-Tk0AoEALEe>?a$Q&S(|4{nG# z$;neNtd>#x)&jY;?8(abrlXFo<){2SBE==ba>5e$-)0P(Ky@Pw=F$SZSBGfY^#`k|%1aczZ#N6}qz z%m@wp7m&Y!zO8LLE)}-Nl{rU#F=Z;Z4EALRy>)A3TT-1K@`M)YhY)`B#ul0IcCPIx zCf%5(&`2cIbSL^#=IO`ZHnnunAU1+zmZEQ(W0Yj2{=tT!)X2YI4WFLoE5b0#^*{I^ z;5rpeE7NwYLlvV-SU=B=v2<~NHid*da%LmUi)7B7Br`FTJNK%<-8L*wy*6V4MVrZc zxuEo>?+#}}Z!j`!_WbNvuRUwB$`>~K0Rg(4BpcjHXl;0;aOKPE-`o#)NdoXF8r;e~ z`(er12|ZzA7}MeT*1*6+N(<>&UB!0|ZtXgnAH_(|6YtK}|F1n+rfnciNN7N^kt$~0 z;K}kmz)3wX!8t*_xbVk`s_H5G;1=hVt5=LS-p+WZ4&*`Qc?sOt<38$8f}N_-u!hPV zMustdGJr)b5wJf}ErfzLL?eFm6&xyCnlmtrJqL3fUZN?bhW_R7Ws_?j%l?@)`h1o< z@=H8jogAcTv=ki9BG}0_t3vDG-sfZ>t7N9}MNy2-VRFfOzI@ymb}(#hgc!U3h!QuT z#6_SS+&Y@Pa>ZCqGLmX2`YH3)do3C-&9q$ylJ1c&!oG1Mc>g!6cPaF>2>LTPL_DvU z%$LH3NC&q!KF!bc0a(n)IQ-{2!ve-)L=hoq5$9FtklZc8(pczH4f;8G;gaiWsl*>e&!(gp{&@XQP~Fd#+sCO1o>{T=dV2e&^r$TbIn z=me1gCWJ@lT`1fg?~~f9Smm0T&CF>8mBzC#d`UZk@J`*4kL$#Kik{T!YHNsq8}`gc`C;!xH#PS7VbT+7~Ld3~s4?sO+^ zcqNc3#?#a8^07)in+ZaY<7>d;qc$(O$DS-~K$0O0{&H;tYtawMxw+SMiOH`RNsOpZ z8=O~0648ZH8z5N3r>*P*2PV*ym?2a}_%D26HDJA<{fBBqqU6->Cq}OHvNkOG$f0(ba5-gN%6ITYeM5tnd z^JO|5htRRrD_mrpsJjpUG%+xQWI#Fq!~ zZZn)|eTNcpOBTYH!}GZEvPDN~6rv-o6NHfMm{06cqU7W0N+v{2c45n0ML0|C%qJwE z{2;IFq@{hx_B9>n(1JiN?~nDl5GD7%fD1S7{sopzeEijq9!lR!yf54$r>TcejybUF zNZSK2y}vomhUdLPr~Z5(JCYj)<3fv=W>AK;z+l5ZRzT*VwHeKC}(ND?~bwYI7h)@w#QBX9w3?ggcN!vp+U@h z&RUn5Ek|!D^1B_fMTo8!phwrdSmt>5U-a;cLPQ}F*`pde$0Y}fdp=9xY1ZW{ctx**e}2j&)QKaUwf*;c zlN(bRylt&*_t*Vi59Fl_;4am)vM%`{7S=ZosfSD2+Ufbt`wQn&%qihNIxjc~(Ezdq zY72$J>>P#p{pa=1jEL##?D(YgONkD?+maiBWr>enS;-7oGM`L{V>HSa^@r$oq@6op zzF$86EbpH%WayOx=Jv*M$Tt*kto`5Is*iNa8B{nB789=Y8ENWM$|Ls@ z8}U{h4k!~bwr;syuYs3-JCJcZw59J4eeS)M{>z20^)U4Vvw6$M?g&~FqXr9eF)Jat zazyvFrE`R}&WH(lb?u?`t=Y-BfxJ;~k?Baa1nCwN(Ld-^lYNyWE(4aY+|}p%ko-1O z`t=c7(%C&3X3CjKlwLj&Lmhdt#`mM+c-03=2J&dl2)$P0lsV00Af#=}0}h;R`(5Rd zn^>x!G%_5mw}^DW++RJmi>2etV-=&JwNN}5+LjEYm&D_DG9x9V+wfB974r_5WAhxZ zgP8TyNOi1L^9ET)jo7wTGGOMFjKYC;zL8_K^0cOC$R=wM*+2nU_ z_v51U0$pu?QC> z=#j@%5^tp<)4WA2Bhf{^y!5GVMEkcaD*3_7@9nQnF#Im-wQwzw7y`-qVvslk9E&jp=K6CIupXM07~XfW;n&f=kzIm%1D$87XSju5QZVj-y5De0ASx zM90gkKP$g)8j_IuD-ULvnSTETl}>46aH2Qm>JyMV1eRL)=j>6v!p>a}co_z}lkbA( zs#O0DSk}m$m^bVM{%eD7gE^Mc{Z5M zvRk~@d(FwlKNUYNC&ID>qcQ?lJNCJB17X!v^$cLOvzlj+2N!TJQlarqf%P!LeyV8BW$Bn>)JUaY_2%jw+Dmi9_rZi|JpxP>kn>@Q{$Z9Uwl~`G)r`9!Z>n5 z8z>|my9eYys!Lb@jF1Gw#L~zwp`t< z>OVs6!24|1|_LQ!moA2-hDH+m`!Mt3DzY_lbI(8gc77+M9BhwR%b8 z$3iCN&d+;u<#<;7Jnu$eeOv+pwCQG(_6o)xVHzXuO~QpOQJ+V za1S3Vw~GP+yi5~7lx>uIeaQ_!i*DR4p?3`ka2fSsLVy2OcOtbm!Xi*+AKL3iS36H3 zJNGF1WK^@G>etTJcIB zYxsPc+xTyeOMupFGbL(91|p^4b5Qyw^c+WimF0kX`wq?%l~_mi0t7^ePdN+eyio|UA+xmt+p2(rs}D*R=2F$>ey7R z0?)n`GzRRr?d=`vxdyBR?4OuSFJssFd8QXY295$3p`S@goLsgL z>pAVy^uM>kKIpb;$KB7t8C zH7UAm)fc_T;x@OVzdhO6wEyS*-2DV_<4&1rbUx>qCg;3RnN7>WqV3FHg*FD+YyD8f z?S`dE!&eyo(j%=l{N3f>5$=aYEhdV!XSFAn%?%jY@beWZ_|vZJ({q_Nh{?tApP{mC z1RPWIkmY!KPC50x?Mzz}*Xuu)Ev)XldT3ri3gLqngr-9X4$U1k@n&CA+rnUGcn8a& zQhOa(#w1g?!c<7=?}Z+I43eNxmx}WsPnYSh@G1?#{x&pjuo8rc|G*~4jeT8r1&P}B z9=YSu@odUdV8 z$YruC)|w*bimjQ=b|%UUKlcIxTc4?%U#X$Byz`6i6T)HhL#()28c4%38k}u@@CYc- z>+5o+IK!|nT?OB^5b7|Xc0XwMAZkL?-N!B9UMRTNv0$w%_1#1a?9iPQVKth9DGxRZ znHR)QefKsq6u=`I1&*rX-sFd3-SGhT3f~M2%B6{UXqPv@BWfUPp-N#4Z!>tf-FG0L z{R_E!5I1qz-TwanR2)-~*rXd~(okrpoK47a>kO`2z!-~IwFioY(gON%iJyl6{e@GZ zaTDV1-*v&HAG2{aM#m4*Oz`S(0;AvAB*gXs(Ko?Uq@R4enjs z!aKqXcq(oaQ9kfMv#hXRBps%wV(x!r~WJJv1lTrtd z9^+JOI$fGf*?^AP+iv)75z^DF0urvEYAjQV28W%P#E2wE&09mp%j=>jozi62RCmhY z_W^A9`j6+rZp;t#wHvgCkLmWkde2BmW}c3oP3G$g!Sb%#{4?CV!Rc?>FmenLF;=iH z5B?`Mzs}zQd$Qk}2t;|bcarhIo!p(v2rxNo+@J_Yx^yyH+w=)DdL|hz1q5+$XW4l(%t2;#IL^OIL_;J;dA#-!;`y=n)d0o zr+v2CUTX$%rm!ttwfQXQ&>j%IuC0XR*nT5Zn=67kWUtH|jvZ~rtM~d^malJdy5?KO z3!r-}hQB+|f0 zK3DnV#utT<8*|OsL-mc%vs|l~Z%Ju2a2G@QYa2)rOlUMXe}6WUPR_q;FEn4}R2jnY z`Ys&FR{8CvO4hPJvqqZbUsbt<;Tzr=PYbrK2@jIvo~rJA?2bqg+5j;U5yR|7)Xd_E z+K-s#xd<}I=@vYD75^NY5`^3yeMofL4ss(PS11{CDH~+9iNV~J-%Q+L9qRcpI935N z(Nq7Kuw) z9U#`q)yc*Idh0Gyr-63>r)jfneJmH^3Fp6YL8zObm-sILY2XxX;$NlEe0N##e9%_V znq?#fPT$*%1kZb?V^xE+*d_K^N0JIc+^|U^$K567v@Df*Qr0lz{O*tNpz@+gz>>eZ z%|&+@u>P`PWzUzc&pKgXRr`0m0?xGutvbZ7Hoq@CPDzCb4HyUu|FV+0f5~KuCu+;# zqGpPtAj+S&a;Zy0l{1Uk{J%@d3w4i7?nVna*|fVoOu2smvfw!?m$W+9sN^^Q_)RS4 zEvK9u=lBZS_G!mxJm(U6(&fnvbrXEyiT>k&9jZ%m46RQtZ|?^=`MG=#E|NpBUVZVg zDXfAH=EbHpA(PJe*E`~wR6*-2bW^S9;!+OlGd(ppHq~s5DkMNT`Gf)^#QcJ4%dt4YI2)(H>x`p{~ zvE!SEAAtw>Tg~}vJ!C=ybSdlsp6xKe@7!9`1=Nq8CkU3xpLXA)Kl|JdDeI4?Z&lQp zgFy!zdR=b`Bvi2pIxGwajv9dk{P~j6k6?x)YvodFhl?p$6z<>8aL*3B-00U!6AMJM z-&E}`!DOaR&cW0|^zRSE&(~5dl+??{giz5UGk&b~ENW3#XpE2i&SVdt8)ns$e3G`| zy(ImRXxQ^^E0BYo2AL6yODp)nmlKUk4F6*$rZN6Z1PW&|b7$-@9QmR$D9u9nO=qs( z^-V^}7o<#NB&CMbCvSK=gs^Ng>u*vZ;2LQcpBSmon4f!_#XWcxScIMPYcBdHkJ37z z(1GG45vgubI8?(l;CEIuEYabT_cV92=KUMc0bMU~1I~z~RBIw8?GLS4rR*h5E$CUt z81EfdF_gKO&c7ZGF?#jh{trtIDwdBZ@tsd|<*#ZJD-|93;!|#bp5MmJvPi_A0(9bn zYbLOu2A0Zt1L`P5=dFDQC5#ZrnqBv_Vb~sg2N)0ls~WALWz3ar2802;K&P3jVgLKL?|CKWw@MW5^FmbBrmR!;_v zPoj)EUs=5hKi7Bn(vCxfH>lncqNlm$uo>>D;Z-kgG7SXY0RvT32*IVKB3TsU%il}w z)T4j_PNDdLeD&V->9uFA^tr%lZ-YufZyKbwM%ur^b}kAUb7h`k7~#Kt-3l%F0v!cH zgLB@r_h_9rB(Ni1^MTSmK`T(<^tjia`*JgbdE4o3fnxZN+P|tHzV?LHvihZ1@sO4d)glr07=je0kQ>}GVF-qsW6|D6Cuj7FbcApeL#3vTOIfcYf5~ap?@vZ$` z6-@qrTl^?Es7W*T0mWa+pDK9-9h6-i3hS^Qbjue1Am@cu9nPM4YY`((lSFK-Kv|ty zFAv(CaeR^|DrI5)7Kz=Pi&{&d#%bVA_En46_ptAkq-*gLV{TYb*M5X|1; zLQh02qm2f6{$=Cu)JNzUuz~PH5OeSe7m8pqPhq#rWk4{!e4peU{jokc9k@$7KB)_b zpx~{nZQNE+0iIO#j9{%8W0zS3`aUJ4Ciut?x(Ze&?JeD~dVW9ByM5}Sm8>~k_!Wk3 zO|WfPQ{O#MO;HVXQ#$GichZ`Vx2)P9``1yaxZQCedgi8{eYPQ&xPs01!I%-UqcGu# z9VYQb+w?F*qLwY8=NqjIU|; zM1Pk4qZ?cS3E}%*C{)#+b38D9~y9TAtiAFTotlo~>@SpeT zjr-r}jJ`|tr|+gdx3e$vM-bbOg1|P%Iw5T88NViu#%f~fAneEQ7r})0Z`Wb&7f(#j zBx941aaXn_IZ7kNKAlfc`6nIx#hmc~<-0{N+JLW*&%!v#ufLLL|L#u9Mqn}RO|JUo z8@q7NlpIo>QcisEU#sU(@QtHwemE5J^$s>*IgWh>yzBB;PD)cilrJJ%K7(M2mT`FR zNCP=}pvOXiaObTCedup@Kq)Jz$K?{zt7Rz4^nU{?QYCcqk}+d0X=K^lGI9DMBAyNQ zu_|PcNvr!cC6(;>Z3=PZ4!@-&GXM9>L?}Kb24dAss}3B&f5`QsIu=yydZUdo zDkwaNDc(X3I9YB6U7Wy@L?I)uecUKYaqqoGVA znRaqQXCJ5z*Z&F3FFgt571lD$Imtqn^2)yI|CAurgCB5Lug6W9Ln?)z#%-#B8tXel z$v#Y#9>|BX5N_fRQp=;E#eP0WqdqoZEKUV5!SblLK$RZNq_33=h$6D9V{<6VL2<5H zV*C2VKx$A|?JiKwqca4m$|1GGAO}~5RJ5!%M5aanHLipU&VA22J8@Jj)eR)|kyu!x zx3~|g&oNazYToVZaD7jzQ}>K5FT3b`Rgbilox9FTh#DQe6BnXAlSu}ztb67+!ea==0GMd3SJ8ZAd;xc8w)U$ zyK$~CP-oxBWN9>blu=+tKdF1a=IwZyDKM(M)jW~!d?OMc{`-bN@?F`%XOtvy&{^(X zbnmN=RfI>VY5Tic)EJUgQqlmH-r4D`xD>tIiIOC**wCNbCmlZ}UQRF5aW@ACAbxt!aQ*qp21}x^sl*gQnXh7d(K7itgMI5qE9i`?%d1x?|T!p z)-E;J93>k=4K7$kd(D?l%$Mb*2k{sVAX=7y156-=)Z^yNsEXZOO}X?a(H)l_)z4R9 zd?aLTP+Zl>n6^rS|Gyk;D#7%(c6g}5;oj86oA;tYQU@7IU(ikIuchT4x&^{y+WYg`e4?p^(2HZucrT(7;GAbhHd^;x$ka{|~C-bCdu8 literal 0 HcmV?d00001 diff --git a/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json b/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json new file mode 100644 index 000000000000..5b51e5dea875 --- /dev/null +++ b/kafka-connect/kafka-connect-runtime/src/main/resources/manifest.json @@ -0,0 +1,47 @@ +{ + "title": "Apache Iceberg Sink Connector", + "name": "iceberg-kafka-connect", + "logo": "assets/iceberg.png", + "version": "__VERSION__", + "description": "The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.", + + "component_types": [ + "sink" + ], + + "requirements": [], + + "features": { + "confluent_control_center_integration": true, + "delivery_guarantee": [ + "exactly_once" + ], + "kafka_connect_api": true, + "single_message_transforms": true, + "supported_encodings": [ + "any" + ] + }, + + "license": [ + { + "name": "Apache License, Version 2.0", + "url": "https://www.apache.org/licenses/LICENSE-2.0" + } + ], + + "owner": { + "name": "Apache Software Foundation", + "logo": "assets/iceberg.png", + "type": "organization", + "url": "https://iceberg.apache.org", + "username": "iceberg" + }, + + "support": { + "provider_name": "Iceberg OSS Community", + "logo": "assets/iceberg.png", + "summary": "Support for this connector is provided by the Iceberg open source community. You can open an issue in the Iceberg GitHub repo or post a question in the Iceberg Slack workspace in the #kafka-connect channel.", + "url": "https://github.com/apache/iceberg/issues" + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index aed11ab0b169..cf34b0bcd4c8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -62,7 +62,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; private static final String TABLE_PROP_PREFIX = "iceberg.table."; private static final String AUTO_CREATE_PROP_PREFIX = "iceberg.tables.auto-create-props."; - private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props."; + private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; private static final String TABLES_PROP = "iceberg.tables"; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java index 460b18fd7fc2..bb9370d3d563 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java @@ -83,14 +83,16 @@ private void close() { @Override public void put(Collection sinkRecords) { - Preconditions.checkNotNull(committer, "Committer wasn't initialized"); - committer.save(sinkRecords); + if (committer != null) { + committer.save(sinkRecords); + } } @Override public void flush(Map currentOffsets) { - Preconditions.checkNotNull(committer, "Committer wasn't initialized"); - committer.save(null); + if (committer != null) { + committer.save(null); + } } @Override diff --git a/settings.gradle b/settings.gradle index 46c85fb65546..cdc69b0e2071 100644 --- a/settings.gradle +++ b/settings.gradle @@ -198,3 +198,7 @@ project(":iceberg-kafka-connect:kafka-connect-events").name = "iceberg-kafka-con include ":iceberg-kafka-connect:kafka-connect" project(":iceberg-kafka-connect:kafka-connect").projectDir = file('kafka-connect/kafka-connect') project(":iceberg-kafka-connect:kafka-connect").name = "iceberg-kafka-connect" + +include ":iceberg-kafka-connect:kafka-connect-runtime" +project(":iceberg-kafka-connect:kafka-connect-runtime").projectDir = file('kafka-connect/kafka-connect-runtime') +project(":iceberg-kafka-connect:kafka-connect-runtime").name = "iceberg-kafka-connect-runtime"