Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro committed Feb 22, 2024
1 parent 41e761b commit bb2337b
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 112 deletions.
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [Features](#features)
- [Quick Start](#quick-start)
- [Requirements](#requirements)
- [Installation](#installation)
- [Deploy](#deploy)
- [Configure](#configure)
- [Start](#start)
Expand Down Expand Up @@ -70,9 +71,13 @@ The Kafka Connector allows to move high volume data out of Kafka by leveraging t

### Requirements

- Insert Docker compose example with a minimal webclient

### Installation

- JDK version 17 or later.
- [Lightstreamer Server](https://lightstreamer.com/download/) version 7.4.1 or later (check the `LS_HOME/GETTING_STARTED.TXT` file for the instructions).
- A running Kafka Cluster.
- A running Kafka broker or Kafka Cluster. Choose between Confluend Cloud e Kafka Broker
- The [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html) tool for running the consumer/producer example clients.

### Deploy
Expand All @@ -81,7 +86,7 @@ Get the deployment package from the [latest release page](releases). Alternative

`./gradlew distribuite`

which generated the `build/distributions/lightstreamer-kafka-connector-<version>.zip` bundle.
which generates the `build/distributions/lightstreamer-kafka-connector-<version>.zip` bundle.

Then, unzip it into the `adapters` folder of the Lightstreamer Server installation.
Check that the final Lightstreamer layout looks like the following:
Expand Down Expand Up @@ -313,9 +318,9 @@ Default value: _KafkaConnector Identifier_ + _Connection Name_ + _Randomly gener

The Lightstreamer Kafka Connector offers wide support for deserializing Kafka records. Currently, it allows the following formats:

- _String_.
- _Avro_.
- _JSON_.
- _String_
- _Avro_
- _JSON_

In particular, the Kafka Connector supports message validation for Avro and JSON, which can be specified through:

Expand All @@ -325,8 +330,8 @@ In particular, the Kafka Connector supports message validation for Avro and JSON
Kafka Connector supports independent deserialization of keys and values, which means that:

- Key and value can have different formats.
- Message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (`key.evaluator.schema.registry.enable` and `value.evaluator.schema.registry.enable`)
- Message validation against local schemas file must be specified separately for key and value (through the `key.evaluator.schema.path` and `value.evaluator.schema.path`)
- Message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (through [`key.evaluator.schema.registry.enable`](#) and `value.evaluator.schema.registry.enable`)
- Message validation against local schema files must be specified separately for key and value (through the `key.evaluator.schema.path` and `value.evaluator.schema.path`)

**NOTE** For Avro, schema validation is required, therefore either a local schema file must be provided or the Confluent Schema Registry must be enabled.

Expand Down
160 changes: 80 additions & 80 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
plugins {
id 'java-library'
id 'eclipse'
id 'distribution'
id "com.github.davidmc24.gradle.plugin.avro" version "1.9.1"
id("com.diffplug.spotless") version "6.25.0"
id 'java-library'
id 'eclipse'
id 'distribution'
id "com.github.davidmc24.gradle.plugin.avro" version "1.9.1"
id("com.diffplug.spotless") version "6.25.0"
}

spotless {
format 'misc', {
target '*.gradle'
trimTrailingWhitespace()
indentWithSpaces()
endWithNewline()
}
groovyGradle {
target '*.gradle'
greclipse()
}
java {
target 'src/main/**/*.java','src/test/**/*.java','src/clients/*/**.java'
googleJavaFormat()
.aosp()
.reorderImports(true)
.reflowLongStrings(true)
.groupArtifact('com.google.googlejavaformat:google-java-format')
removeUnusedImports()
formatAnnotations()
licenseHeader '''
format 'misc', {
target '*.gradle'
trimTrailingWhitespace()
indentWithSpaces()
endWithNewline()
}
groovyGradle {
target '*.gradle'
greclipse()
}
java {
target 'src/main/**/*.java','src/test/**/*.java','src/clients/*/**.java'
googleJavaFormat()
.aosp()
.reorderImports(true)
.reflowLongStrings(false)
.groupArtifact('com.google.googlejavaformat:google-java-format')
removeUnusedImports()
formatAnnotations()
licenseHeader '''
/*
* Copyright (C) $YEAR Lightstreamer Srl
*
Expand All @@ -44,93 +44,93 @@ spotless {
*/
'''
}
}
}

version = '0.1.0'

repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
testImplementation(platform('org.junit:junit-bom:5.10.0'))
testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation "com.google.truth:truth:1.2.0"
testImplementation "com.google.truth.extensions:truth-java8-extension:1.2.0"

implementation group: 'com.lightstreamer', name: 'ls-adapter-inprocess', version: '8.0.0'
implementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10'

implementation group:'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs'
implementation group:'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3'
implementation group:'io.confluent', name: 'kafka-json-serializer', version:'7.5.3'
implementation group:'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3'
testImplementation(platform('org.junit:junit-bom:5.10.0'))
testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation "com.google.truth:truth:1.2.0"
testImplementation "com.google.truth.extensions:truth-java8-extension:1.2.0"

implementation group: 'com.lightstreamer', name: 'ls-adapter-inprocess', version: '8.0.0'
implementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10'

implementation group:'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs'
implementation group:'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3'
implementation group:'io.confluent', name: 'kafka-json-serializer', version:'7.5.3'
implementation group:'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3'
}

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

tasks.named('test') {
useJUnitPlatform()
useJUnitPlatform()
}

task deployAdapters(type: Copy) {
dependsOn 'cleanDeploy'
dependsOn 'cleanDeploy'

into "deploy"
into "deploy"

from (jar) {
into "connector/lib"
}
from (jar) {
into "connector/lib"
}

from (configurations.runtimeClasspath) {
into "connector/lib"
exclude "ls-adapter-inprocess*"
exclude "jsr305*"
}
from (configurations.runtimeClasspath) {
into "connector/lib"
exclude "ls-adapter-inprocess*"
exclude "jsr305*"
}

from ("examples/quickstart") {
into "connector"
}
from ("examples/quickstart") {
into "connector"
}
}

task deploy(type: Copy) {
dependsOn 'deployAdapters'
into "deploy/conf"
from ("examples/conf")
dependsOn 'deployAdapters'
into "deploy/conf"
from ("examples/conf")
}

task cleanDeploy(type: Delete) {
delete "deploy"
delete "deploy"
}

sourceSets.main.java.srcDirs += ['src/clients/java']

distributions {
connector {
distributionBaseName = rootProject.name

contents {
from(jar) {
into "lib"
}

from (configurations.runtimeClasspath) {
into "lib"
exclude "ls-adapter-inprocess*"
exclude "jsr305*"
}
}
}
connector {
distributionBaseName = rootProject.name

contents {
from(jar) {
into "lib"
}

from (configurations.runtimeClasspath) {
into "lib"
exclude "ls-adapter-inprocess*"
exclude "jsr305*"
}
}
}
}

task distribute {
dependsOn connectorDistZip
task distribuite {
dependsOn connectorDistZip
}
20 changes: 12 additions & 8 deletions src/connector/dist/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,36 @@
<!---->
</data_provider>

<data_provider name="QuickStartSchemaRegisry">
<data_provider name="QuickStart">
<adapter_class>com.lightstreamer.kafka_connector.adapters.ConnectorDataAdapter</adapter_class>

<!-- The Kafka cluster address -->
<!--<param name="bootstrap.servers">pkc-z9doz.eu-west-1.aws.confluent.cloud:9092</param>-->
<param name="bootstrap.servers">localhost:9092</param>
<param name="bootstrap.servers">broker:29092</param>

<param name="group.id">test</param>

<param name="enabled">true</param>
<param name="enable">true</param>

<!-- TOPIC MAPPING SECTION -->

<!-- Define a "sample" item-template, which is simply made of the "sample" item name to be used by the Lighstreamer Client subscription. -->
<!-- <param name="item-template.sample1">sample-#{partition=PARTITION}</param>
<param name="item-template.sample2">sample</param> -->
<param name="item-template.sample1">sample-#{key=KEY}</param>
<param name="item-template.sample1">sample-#{key=KEY.name}</param>
<param name="item-template.sample2">sample-#{value=VALUE}</param>

<!-- Map the Kafka topic "sample-topic" to the previous defined "sample" item template. -->
<param name="map.avro-topic-1.to">item-template.sample1</param>

<param name="key.evaluator.type">AVRO</param>
<!-- <param name="key.evaluator.schema.registry.enable">true</param> -->
<param name="key.evaluator.schema.path">user-key.avsc</param>
<param name="value.evaluator.type">AVRO</param>
<param name="value.evaluator.schema.registry.enable">enabled</param>
<!-- <param name="value.evaluator.schema.registry.enable">true</param> -->
<param name="value.evaluator.schema.path">user-value.avsc</param>

<param name="schema.registry.url">https://localhost:8081</param>
<!-- <param name="schema.registry.url">http://schema-registry:8081</param> -->

<!--<param name="schema.registry.encryption.truststore.path">secrets/kafka.client.truststore.jks</param>
<param name="schema.registry.encryption.truststore.password">password</param>
Expand Down Expand Up @@ -171,10 +175,10 @@
<!-- FIELDS MAPPING SECTION -->

<!-- Extraction of the record key mapped to the field "key". -->
<param name="field.key">#{KEY}</param>
<param name="field.key">#{KEY.name}</param>

<!-- Extraction of the record value mapped to the field "value". -->
<param name="field.value">#{VALUE}</param>
<param name="field.value">#{VALUE.name}</param>

<!-- Extraction of the record timestamp to the field "ts". -->
<param name="field.ts">#{TIMESTAMP}</param>
Expand Down
16 changes: 16 additions & 0 deletions src/connector/dist/user-key.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"namespace": "example.avro",
"type": "record",
"name": "Key",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "id",
"type": "string"
}
]
}

Loading

0 comments on commit bb2337b

Please sign in to comment.