From 863be4fcaa3b50512e7cdb05d31698ada5e58bb7 Mon Sep 17 00:00:00 2001 From: Gianluca Date: Thu, 29 Feb 2024 12:33:58 +0100 Subject: [PATCH] Checkpoint --- README.md | 16 +++++++++--- .../lightstreamer-kafka-connector.gradle | 7 +++-- examples/docker-image/Dockerfile | 11 ++++---- examples/docker-image/build.sh | 21 +++++---------- examples/quickstart/Dockerfile.producer | 9 ++++--- examples/quickstart/docker-compose.yaml | 7 +++-- examples/quickstart/launch_quickstart.sh | 14 ---------- examples/quickstart/shutdown_quickstart.sh | 3 --- examples/quickstart/start.sh | 15 +++++++++++ examples/quickstart/stop.sh | 8 ++++++ examples/utils/helpers.sh | 9 +++++++ examples/utils/remote.sh | 26 +++++++++++++++++++ gradle.properties | 6 ++--- .../src/connector/dist/adapters.xml | 1 + .../adapters/consumers/ConsumerLoop.java | 2 +- .../adapters/config/ConnectorConfigTest.java | 14 ++++++++++ 16 files changed, 114 insertions(+), 55 deletions(-) delete mode 100755 examples/quickstart/launch_quickstart.sh delete mode 100755 examples/quickstart/shutdown_quickstart.sh create mode 100755 examples/quickstart/start.sh create mode 100755 examples/quickstart/stop.sh create mode 100755 examples/utils/helpers.sh create mode 100644 examples/utils/remote.sh diff --git a/README.md b/README.md index 2b3a1908..cb269509 100644 --- a/README.md +++ b/README.md @@ -79,9 +79,9 @@ As you can see from the diagram above, in this variant the stream of simulated m To provide a complete stack, the app is based on _Docker Compose_. The [Docker Compose file](examples/quickstart/docker-compose.yaml) comprises the following services: 1. A Kafka broker, based on the [Confluent Local Docker Image](confluentinc/confluent-local:latest). -2. Lighstreamer Server with Kafka Connector, based on the [Lightstreamer Kafka Connector Docker image example](examples/docker-image/) +2. Lighstreamer Server with Kafka Connector, based on the [Lightstreamer Kafka Connector Docker image example](examples/docker-image/). 3. The web client, mounted on the Lightreamer service. -4. A native Kafka Producer, based on the provided [Dockerfile.producer](examples/quickstart/Dockerfile.producer) file and [kafka-connector-samples](kafka-connector-samples/) submodule of the project root. +4. A native Kafka Producer, based on the provided [Dockerfile.producer](examples/quickstart/Dockerfile.producer) file and [kafka-connector-samples](kafka-connector-samples/) submodule of this repository. ### Run @@ -91,7 +91,15 @@ To run the app: 2. From the [`examples/quickstart`](examples/quickstart/) folder, run the command: ```sh - ./launch_quickstart.sh + ./start.sh + ... + ⠏ Network quickstart_default Created + ✔ Container broker Started + ✔ Container producer Started + ✔ Container kafka-connector Started + ... + Service started. Now you can point your browser to http://localhost:8080/QuickStart to see real-time data. + ... ``` 3. Once all containers are ready, point your browser to [http://localhost:8080/QuickStart](http://localhost:8080/QuickStart). @@ -101,7 +109,7 @@ To run the app: 4. To shutdown Docker Compose and clean up all temporary resources: ```sh - ./shutdown_quickstart.sh + ./stop.sh ``` ## Installation diff --git a/buildSrc/src/main/groovy/lightstreamer-kafka-connector.gradle b/buildSrc/src/main/groovy/lightstreamer-kafka-connector.gradle index 6a5958a6..7386a65d 100644 --- a/buildSrc/src/main/groovy/lightstreamer-kafka-connector.gradle +++ b/buildSrc/src/main/groovy/lightstreamer-kafka-connector.gradle @@ -71,7 +71,6 @@ tasks.named('test') { useJUnitPlatform() } - -task cleanQuickStart(type: Delete) { - delete "$rootDir/$quickstartDeployDirName" -} \ No newline at end of file +clean { + delete "$rootDir/$deployDirName" +} diff --git a/examples/docker-image/Dockerfile b/examples/docker-image/Dockerfile index 68790c5b..bbff8a38 100644 --- a/examples/docker-image/Dockerfile +++ b/examples/docker-image/Dockerfile @@ -1,14 +1,15 @@ # Start form the Official Lightstramer Server image FROM lightstreamer -# The Kafka Connector version +# The Kafka Connector project version ARG VERSION -ENV KAFKA_CONNECTOR=lightstreamer-kafka-connector-${VERSION} -ENV DEPLOY_DIR=/lightstreamer/adapters/${KAFKA_CONNECTOR} +ENV KAFKA_CONNECTOR_NAME=lightstreamer-kafka-connector-${VERSION} +ENV KAFKA_CONNECTOR_ZIP=${KAFKA_CONNECTOR_NAME}.zip +ENV DEPLOY_DIR=/lightstreamer/adapters/${KAFKA_CONNECTOR_NAME} # Copy the distribution package -COPY tmp/${KAFKA_CONNECTOR}.zip /tmp/${KAFKA_CONNECTOR}.zip +COPY tmp/${KAFKA_CONNECTOR_ZIP} /tmp/${KAFKA_CONNECTOR_ZIP} # Copy custom resources COPY resources /tmp/resources @@ -17,7 +18,7 @@ USER root RUN apt-get -y update; \ apt-get install -y unzip; \ # Unzip the distribution package into to Lightstreamer's adapters fodler. - unzip /tmp/${KAFKA_CONNECTOR}.zip -d /lightstreamer/adapters; \ + unzip /tmp/${KAFKA_CONNECTOR_ZIP} -d /lightstreamer/adapters; \ # Copy the custom resources cp /tmp/resources/* ${DEPLOY_DIR}; \ # Fix ownership diff --git a/examples/docker-image/build.sh b/examples/docker-image/build.sh index f78e19f7..0c0902cf 100755 --- a/examples/docker-image/build.sh +++ b/examples/docker-image/build.sh @@ -1,23 +1,16 @@ #!/bin/bash - -# Set the root project directory -projectDir=../../ - -# Alias to the local gradlew command -_gradle="${projectDir}/gradlew --project-dir ${projectDir}" - +source ../utils/helpers.sh +SCRIPT_DIR=$(dirname ${BASH_SOURCE[0]}) +TMP_DIR=${SCRIPT_DIR}/tmp # Generate the distribution -echo "Making the distribution" +echo "Making the distribution package" $_gradle distribuite -# Get the current version -version=$($_gradle properties -q --console=plain | grep '^version:' | awk '{ printf $2}') - -mkdir -p tmp -cp ${projectDir}/deploy/lightstreamer-kafka-connector-${version}.zip tmp/ +rm -fr ${TMP_DIR}; mkdir ${TMP_DIR} +cp ${projectDir}/deploy/lightstreamer-kafka-connector-${version}.zip ${TMP_DIR} echo "Build the Lightstramer Kafka Connector Docker image" -docker build -t lightstreamer-kafka-connector-${version} . --build-arg VERSION=${version} +docker build -t lightstreamer-kafka-connector-${version} $SCRIPT_DIR --build-arg VERSION=${version} if [ $? == 0 ]; then echo "Launch the image with:" diff --git a/examples/quickstart/Dockerfile.producer b/examples/quickstart/Dockerfile.producer index 382d1e58..41bdcaf6 100644 --- a/examples/quickstart/Dockerfile.producer +++ b/examples/quickstart/Dockerfile.producer @@ -1,6 +1,9 @@ FROM eclipse-temurin:17 -RUN mkdir /opt/producer -COPY tmp/lightstreamer-kafka-connector-samples-all-*.jar /opt/producer/lightstreamer-kafka-connector-samples-all.jar +# The Kafka Connector project version +ARG VERSION -ENTRYPOINT ["java", "-jar", "/opt/producer/lightstreamer-kafka-connector-samples-all.jar"] +# Copy the Produer jar +COPY tmp/lightstreamer-kafka-connector-samples-all-${VERSION}.jar /usr/app/producer.jar + +ENTRYPOINT ["java", "-jar", "/usr/app/producer.jar"] diff --git a/examples/quickstart/docker-compose.yaml b/examples/quickstart/docker-compose.yaml index 2001430b..14dd9966 100644 --- a/examples/quickstart/docker-compose.yaml +++ b/examples/quickstart/docker-compose.yaml @@ -3,7 +3,7 @@ version: '2' services: kafka-connector: container_name: kafka-connector - image: lightstreamer-kafka-connector-0.1.0 + image: lightstreamer-kafka-connector-${version} depends_on: - producer ports: @@ -17,6 +17,8 @@ services: - broker build: dockerfile: Dockerfile.producer + args: + VERSION: ${version} command: ["--bootstrap-servers", "broker:29092", "--topic", "stocks"] broker: @@ -33,8 +35,6 @@ services: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.server.keystore.jks - KAFKA_SSL_KEYSTORE_PASSWORD: password KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' @@ -44,4 +44,3 @@ services: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" - # CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' \ No newline at end of file diff --git a/examples/quickstart/launch_quickstart.sh b/examples/quickstart/launch_quickstart.sh deleted file mode 100755 index e6078b7b..00000000 --- a/examples/quickstart/launch_quickstart.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash -cwd=$(pwd) - -# Build the Lightstreamer Kafka Connector Docker image -cd ../docker-image -./build.sh - -if [ $? == 0 ]; then - cd ../.. - # Deploy the Kafka Producer to the tmp folder - ./gradlew QuickStart - cd $cwd - docker compose up --build -d -fi \ No newline at end of file diff --git a/examples/quickstart/shutdown_quickstart.sh b/examples/quickstart/shutdown_quickstart.sh deleted file mode 100755 index 1f58adfe..00000000 --- a/examples/quickstart/shutdown_quickstart.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -docker compose down -rm -fr tmp \ No newline at end of file diff --git a/examples/quickstart/start.sh b/examples/quickstart/start.sh new file mode 100755 index 00000000..4d1c2520 --- /dev/null +++ b/examples/quickstart/start.sh @@ -0,0 +1,15 @@ +#!/bin/bash +source ../utils/helpers.sh +rm -fr tmp; mkdir -p tmp + +# Build the Lightstreamer Kafka Connector Docker image +../docker-image/build.sh + +if [ $? == 0 ]; then + cp ../../deploy/lightstreamer-kafka-connector-samples-all-${version}.jar tmp/ + # Export the version env variable to be used by Compose + export version + docker compose up --build -d + sleep 20 + echo "Service started. Now you can point your browers to http://localhost:8080/QuickStart to see real-time data." + fi \ No newline at end of file diff --git a/examples/quickstart/stop.sh b/examples/quickstart/stop.sh new file mode 100755 index 00000000..51064efa --- /dev/null +++ b/examples/quickstart/stop.sh @@ -0,0 +1,8 @@ +#!/bin/bash +source ../utils/helpers.sh + +# Export the version env variable to be used by Compose +export version +docker compose down +rm -fr tmp +$_gradle clean \ No newline at end of file diff --git a/examples/utils/helpers.sh b/examples/utils/helpers.sh new file mode 100755 index 00000000..953106d1 --- /dev/null +++ b/examples/utils/helpers.sh @@ -0,0 +1,9 @@ +#!/bin/bash +HELPER_DIR=$(dirname ${BASH_SOURCE[0]}) ## realpath +projectDir="$HELPER_DIR/../.." + +# Alias to the the local gradlew command +_gradle="${projectDir}/gradlew --project-dir ${projectDir}" + +# Get the current version +version=$(grep '^version' ${projectDir}/gradle.properties | awk -F= '{ printf $2}') diff --git a/examples/utils/remote.sh b/examples/utils/remote.sh new file mode 100644 index 00000000..fbee0a8d --- /dev/null +++ b/examples/utils/remote.sh @@ -0,0 +1,26 @@ +# Add Docker's official GPG key: +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc + +# Add the repository to Apt sources: +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +sudo apt-get update + +sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin +sudo docker run hello-world +sudo /usr/sbin/usermod -a -G docker ubuntu + +sudo apt install net-tools +sudo apt-get install zip unzip +curl -s "https://get.sdkman.io" | bash +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk version +sdk install java +mkdir confluent +git clone https://github.com/confluentinc/kafka-images.git confluent/kafka-images diff --git a/gradle.properties b/gradle.properties index ec5634cc..b035701e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version = 0.1.0 -deployDirName = deploy -quickstartDeployDirName = examples/quickstart/tmp +version=0.1.1 +deployDirName=deploy +quickstartDeployDirName=examples/quickstart/tmp diff --git a/kafka-connector/src/connector/dist/adapters.xml b/kafka-connector/src/connector/dist/adapters.xml index 2d6a363d..8a467530 100644 --- a/kafka-connector/src/connector/dist/adapters.xml +++ b/kafka-connector/src/connector/dist/adapters.xml @@ -134,6 +134,7 @@ INTEGER JSON + earliest #{KEY} diff --git a/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/consumers/ConsumerLoop.java b/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/consumers/ConsumerLoop.java index 3aa462ad..e43f654d 100644 --- a/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/consumers/ConsumerLoop.java +++ b/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/consumers/ConsumerLoop.java @@ -225,7 +225,7 @@ private boolean subscribed() { // Check the actual available topics on Kafka. try (Admin admin = AdminClient.create(config.consumerProperties())) { ListTopicsOptions options = new ListTopicsOptions(); - options.timeoutMs(5000); + options.timeoutMs(30000); ListTopicsResult listTopics = admin.listTopics(options); boolean notAllPresent = false; diff --git a/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java b/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java index d18adb38..a590601b 100644 --- a/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java +++ b/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java @@ -744,6 +744,20 @@ public void shouldGetEnabled() { assertThat(config.isEnabled()).isFalse(); } + @Test + public void shouldOverrideAutoOffset() { + ConnectorConfig config = + ConnectorConfig.newConfig(adapterDir.toFile(), standardParameters()); + + Map updatedConfig = new HashMap<>(standardParameters()); + updatedConfig.put(ConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, "earliest"); + config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig); + assertThat(config.getText(ConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG)) + .isEqualTo("earliest"); + assertThat(config.baseConsumerProps()) + .containsAtLeast(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + @Test public void shouldGetEncryptionEnabled() { ConnectorConfig config =