Skip to content

Commit

Permalink
Kafka Connect: Runtime distribution with integration tests (#10739)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jul 29, 2024
1 parent 94d336c commit 30e761e
Show file tree
Hide file tree
Showing 15 changed files with 1,350 additions and 10 deletions.
5 changes: 3 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ flink119 = { strictly = "1.19.0"}
google-libraries-bom = "26.43.0"
guava = "33.2.1-jre"
hadoop2 = "2.7.3"
hadoop3-client = "3.3.6"
hadoop3 = "3.3.6"
httpcomponents-httpclient5 = "5.3.1"
hive2 = { strictly = "2.3.9"} # see rich version usage explanation above
hive3 = "3.1.3"
Expand Down Expand Up @@ -132,7 +132,8 @@ hadoop2-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "ha
hadoop2-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop2" }
hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop2" }
hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" }
hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3-client" }
hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3" }
hadoop3-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop3" }
hive2-exec = { module = "org.apache.hive:hive-exec", version.ref = "hive2" }
hive2-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive2" }
hive2-serde = { module = "org.apache.hive:hive-serde", version.ref = "hive2" }
Expand Down
184 changes: 181 additions & 3 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
project(':iceberg-kafka-connect:iceberg-kafka-connect-events') {
dependencies {
api project(':iceberg-api')
implementation project(':iceberg-core')
Expand All @@ -28,10 +28,10 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {

test {
useJUnitPlatform()
}
}
}

project(":iceberg-kafka-connect:iceberg-kafka-connect") {
project(':iceberg-kafka-connect:iceberg-kafka-connect') {
dependencies {
api project(':iceberg-api')
implementation project(':iceberg-core')
Expand All @@ -57,3 +57,181 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect") {
useJUnitPlatform()
}
}

project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') {
apply plugin: 'distribution'

configurations {
hive {
extendsFrom runtimeClasspath
}
all {
exclude group: 'javax.activation', module: 'activation'
// force upgrades for dependencies with known vulnerabilities...
resolutionStrategy {
force 'org.codehaus.jettison:jettison:1.5.4'
force 'org.xerial.snappy:snappy-java:1.1.10.5'
force 'org.apache.commons:commons-compress:1.26.0'
force 'org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0'
}
}
}

sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
}
}

configurations {
integrationImplementation.extendsFrom testImplementation
integrationRuntime.extendsFrom testRuntimeOnly
}

dependencies {
implementation project(':iceberg-kafka-connect:iceberg-kafka-connect')
implementation(libs.hadoop3.common) {
exclude group: 'log4j'
exclude group: 'org.slf4j'
exclude group: 'ch.qos.reload4j'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'com.fasterxml.woodstox'
exclude group: 'com.google.guava'
exclude group: 'com.google.protobuf'
exclude group: 'org.apache.curator'
exclude group: 'org.apache.zookeeper'
exclude group: 'org.apache.kerby'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
exclude group: 'org.apache.commons', module: 'commons-configuration2'
exclude group: 'org.apache.hadoop.thirdparty', module: 'hadoop-shaded-protobuf_3_7'
exclude group: 'org.codehaus.woodstox'
exclude group: 'org.eclipse.jetty'
}
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')

implementation project(':iceberg-aws')
implementation platform(libs.awssdk.bom)
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:iam'
implementation 'software.amazon.awssdk:sso'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:kms'
implementation 'software.amazon.awssdk:glue'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:dynamodb'
implementation 'software.amazon.awssdk:lakeformation'

implementation project(':iceberg-gcp')
implementation platform(libs.google.libraries.bom)
implementation 'com.google.cloud:google-cloud-storage'

implementation project(':iceberg-azure')
implementation platform(libs.azuresdk.bom)
implementation 'com.azure:azure-storage-file-datalake'
implementation 'com.azure:azure-identity'

hive project(':iceberg-hive-metastore')
hive(libs.hive2.metastore) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.apache.hbase'
exclude group: 'org.apache.logging.log4j'
exclude group: 'co.cask.tephra'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet'
exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle'
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-applicationhistoryservice'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-web-proxy'
exclude group: 'org.apache.hive', module: 'hive-service-rpc'
exclude group: 'com.github.joshelser', module: 'dropwizard-metrics-hadoop-metrics2-reporter'
}
hive(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}

integrationImplementation project(':iceberg-api')
integrationImplementation project(':iceberg-common')
integrationImplementation project(':iceberg-core')
integrationImplementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
integrationImplementation platform(libs.jackson.bom)
integrationImplementation libs.jackson.core
integrationImplementation libs.jackson.databind
integrationImplementation libs.jackson.databind
integrationImplementation libs.kafka.clients
integrationImplementation libs.kafka.connect.api
integrationImplementation libs.kafka.connect.json
integrationImplementation libs.testcontainers
integrationImplementation libs.httpcomponents.httpclient5
integrationImplementation libs.awaitility
}

task integrationTest(type: Test) {
useJUnitPlatform()
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
jvmArgs += project.property('extraJvmArgs')
}

processResources {
filter {
it.replace('__VERSION__', project.version.toString())
}
}

distributions {
main {
contents {
from "${processResources.destinationDir}/manifest.json"
into('lib/') {
from configurations.runtimeClasspath
}
into('doc/') {
from "$rootDir/LICENSE"
}
into('assets/') {
from "${processResources.destinationDir}/iceberg.png"
}
}
}
hive {
contents {
from "${processResources.destinationDir}/manifest.json"
into('lib/') {
from configurations.hive
}
into('doc/') {
from "$rootDir/LICENSE"
}
into('assets/') {
from "${processResources.destinationDir}/iceberg.png"
}
}
}
}

tasks.jar.enabled = false

tasks.distTar.enabled = false
distZip.dependsOn processResources
installDist.dependsOn processResources

tasks.hiveDistTar.enabled = false
hiveDistZip.dependsOn processResources
installHiveDist.dependsOn processResources

integrationTest.dependsOn installDist
check.dependsOn integrationTest

assemble.dependsOn distZip, hiveDistZip
}
107 changes: 107 additions & 0 deletions kafka-connect/kafka-connect-runtime/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

volumes:
data: {}

services:
minio:
image: minio/minio
hostname: minio
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
ports:
- 9000:9000
- 9001:9001
volumes:
- data:/data
command: server /data --console-address ":9001"

create-bucket:
image: minio/mc
depends_on:
- minio
volumes:
- data:/data
entrypoint: mc mb /data/bucket

iceberg:
image: tabulario/iceberg-rest
depends_on:
- create-bucket
hostname: iceberg
ports:
- 8181:8181
environment:
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://bucket/warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_PATH__STYLE__ACCESS=true
- CATALOG_S3_ACCESS__KEY__ID=minioadmin
- CATALOG_S3_SECRET__ACCESS__KEY=minioadmin

kafka:
image: confluentinc/cp-kafka
hostname: kafka
ports:
- 29092:29092
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENERS: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: E-JXLvCaTiaUYDb1LwZ1JQ

connect:
image: confluentinc/cp-kafka-connect
depends_on:
- kafka
hostname: connect
ports:
- 8083:8083
volumes:
- ../build/install:/test/kafka-connect
environment:
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kc
CONNECT_CONFIG_STORAGE_TOPIC: kc-config
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: kc-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: kc-storage
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_PLUGIN_PATH: /test/kafka-connect
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 500
Loading

0 comments on commit 30e761e

Please sign in to comment.