diff --git a/README.md b/README.md index e2763a06..7dffaa3e 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ - [Features](#features) - [Quick Start](#quick-start) - [Requirements](#requirements) + - [Installation](#installation) - [Deploy](#deploy) - [Configure](#configure) - [Start](#start) @@ -70,9 +71,13 @@ The Kafka Connector allows to move high volume data out of Kafka by leveraging t ### Requirements +- Insert Docker compose example with a minimal webclient + +### Installation + - JDK version 17 or later. - [Lightstreamer Server](https://lightstreamer.com/download/) version 7.4.1 or later (check the `LS_HOME/GETTING_STARTED.TXT` file for the instructions). -- A running Kafka Cluster. +- A running Kafka broker or Kafka Cluster. Choose between Confluend Cloud e Kafka Broker - The [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html) tool for running the consumer/producer example clients. ### Deploy @@ -81,7 +86,7 @@ Get the deployment package from the [latest release page](releases). Alternative `./gradlew distribuite` -which generated the `build/distributions/lightstreamer-kafka-connector-.zip` bundle. +which generates the `build/distributions/lightstreamer-kafka-connector-.zip` bundle. Then, unzip it into the `adapters` folder of the Lightstreamer Server installation. Check that the final Lightstreamer layout looks like the following: @@ -313,9 +318,9 @@ Default value: _KafkaConnector Identifier_ + _Connection Name_ + _Randomly gener The Lightstreamer Kafka Connector offers wide support for deserializing Kafka records. Currently, it allows the following formats: -- _String_. -- _Avro_. -- _JSON_. +- _String_ +- _Avro_ +- _JSON_ In particular, the Kafka Connector supports message validation for Avro and JSON, which can be specified through: @@ -325,8 +330,8 @@ In particular, the Kafka Connector supports message validation for Avro and JSON Kafka Connector supports independent deserialization of keys and values, which means that: - Key and value can have different formats. -- Message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (`key.evaluator.schema.registry.enable` and `value.evaluator.schema.registry.enable`) -- Message validation against local schemas file must be specified separately for key and value (through the `key.evaluator.schema.path` and `value.evaluator.schema.path`) +- Message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (through [`key.evaluator.schema.registry.enable`](#) and `value.evaluator.schema.registry.enable`) +- Message validation against local schema files must be specified separately for key and value (through the `key.evaluator.schema.path` and `value.evaluator.schema.path`) **NOTE** For Avro, schema validation is required, therefore either a local schema file must be provided or the Confluent Schema Registry must be enabled. diff --git a/build.gradle b/build.gradle index 32ce93ce..0dd2cb38 100644 --- a/build.gradle +++ b/build.gradle @@ -1,32 +1,32 @@ plugins { - id 'java-library' - id 'eclipse' - id 'distribution' - id "com.github.davidmc24.gradle.plugin.avro" version "1.9.1" - id("com.diffplug.spotless") version "6.25.0" + id 'java-library' + id 'eclipse' + id 'distribution' + id "com.github.davidmc24.gradle.plugin.avro" version "1.9.1" + id("com.diffplug.spotless") version "6.25.0" } spotless { - format 'misc', { - target '*.gradle' - trimTrailingWhitespace() - indentWithSpaces() - endWithNewline() - } - groovyGradle { - target '*.gradle' - greclipse() - } - java { - target 'src/main/**/*.java','src/test/**/*.java','src/clients/*/**.java' - googleJavaFormat() - .aosp() - .reorderImports(true) - .reflowLongStrings(true) - .groupArtifact('com.google.googlejavaformat:google-java-format') - removeUnusedImports() - formatAnnotations() - licenseHeader ''' + format 'misc', { + target '*.gradle' + trimTrailingWhitespace() + indentWithSpaces() + endWithNewline() + } + groovyGradle { + target '*.gradle' + greclipse() + } + java { + target 'src/main/**/*.java','src/test/**/*.java','src/clients/*/**.java' + googleJavaFormat() + .aosp() + .reorderImports(true) + .reflowLongStrings(false) + .groupArtifact('com.google.googlejavaformat:google-java-format') + removeUnusedImports() + formatAnnotations() + licenseHeader ''' /* * Copyright (C) $YEAR Lightstreamer Srl * @@ -44,93 +44,93 @@ spotless { */ ''' - } + } } version = '0.1.0' repositories { - mavenCentral() - maven { - url "https://packages.confluent.io/maven" - } + mavenCentral() + maven { + url "https://packages.confluent.io/maven" + } } dependencies { - testImplementation(platform('org.junit:junit-bom:5.10.0')) - testImplementation('org.junit.jupiter:junit-jupiter') - testImplementation "com.google.truth:truth:1.2.0" - testImplementation "com.google.truth.extensions:truth-java8-extension:1.2.0" - - implementation group: 'com.lightstreamer', name: 'ls-adapter-inprocess', version: '8.0.0' - implementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10' - - implementation group:'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs' - implementation group:'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3' - implementation group:'io.confluent', name: 'kafka-json-serializer', version:'7.5.3' - implementation group:'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3' + testImplementation(platform('org.junit:junit-bom:5.10.0')) + testImplementation('org.junit.jupiter:junit-jupiter') + testImplementation "com.google.truth:truth:1.2.0" + testImplementation "com.google.truth.extensions:truth-java8-extension:1.2.0" + + implementation group: 'com.lightstreamer', name: 'ls-adapter-inprocess', version: '8.0.0' + implementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10' + + implementation group:'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs' + implementation group:'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3' + implementation group:'io.confluent', name: 'kafka-json-serializer', version:'7.5.3' + implementation group:'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3' } java { - toolchain { - languageVersion = JavaLanguageVersion.of(17) - } + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } } tasks.named('test') { - useJUnitPlatform() + useJUnitPlatform() } task deployAdapters(type: Copy) { - dependsOn 'cleanDeploy' + dependsOn 'cleanDeploy' - into "deploy" + into "deploy" - from (jar) { - into "connector/lib" - } + from (jar) { + into "connector/lib" + } - from (configurations.runtimeClasspath) { - into "connector/lib" - exclude "ls-adapter-inprocess*" - exclude "jsr305*" - } + from (configurations.runtimeClasspath) { + into "connector/lib" + exclude "ls-adapter-inprocess*" + exclude "jsr305*" + } - from ("examples/quickstart") { - into "connector" - } + from ("examples/quickstart") { + into "connector" + } } task deploy(type: Copy) { - dependsOn 'deployAdapters' - into "deploy/conf" - from ("examples/conf") + dependsOn 'deployAdapters' + into "deploy/conf" + from ("examples/conf") } task cleanDeploy(type: Delete) { - delete "deploy" + delete "deploy" } sourceSets.main.java.srcDirs += ['src/clients/java'] distributions { - connector { - distributionBaseName = rootProject.name - - contents { - from(jar) { - into "lib" - } - - from (configurations.runtimeClasspath) { - into "lib" - exclude "ls-adapter-inprocess*" - exclude "jsr305*" - } - } - } + connector { + distributionBaseName = rootProject.name + + contents { + from(jar) { + into "lib" + } + + from (configurations.runtimeClasspath) { + into "lib" + exclude "ls-adapter-inprocess*" + exclude "jsr305*" + } + } + } } -task distribute { - dependsOn connectorDistZip +task distribuite { + dependsOn connectorDistZip } diff --git a/src/connector/dist/adapters.xml b/src/connector/dist/adapters.xml index 2bd2f407..e93a6eb2 100644 --- a/src/connector/dist/adapters.xml +++ b/src/connector/dist/adapters.xml @@ -116,32 +116,36 @@ - + com.lightstreamer.kafka_connector.adapters.ConnectorDataAdapter - localhost:9092 + broker:29092 test - true + true - sample-#{key=KEY} + sample-#{key=KEY.name} sample-#{value=VALUE} item-template.sample1 + AVRO + + user-key.avsc AVRO - enabled + + user-value.avsc - https://localhost:8081 + - #{KEY} + #{KEY.name} - #{VALUE} + #{VALUE.name} #{TIMESTAMP} diff --git a/src/connector/dist/user-key.avsc b/src/connector/dist/user-key.avsc new file mode 100644 index 00000000..f4587a89 --- /dev/null +++ b/src/connector/dist/user-key.avsc @@ -0,0 +1,16 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "Key", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "id", + "type": "string" + } + ] +} + \ No newline at end of file diff --git a/src/connector/dist/user-value.avsc b/src/connector/dist/user-value.avsc new file mode 100644 index 00000000..175e77ba --- /dev/null +++ b/src/connector/dist/user-value.avsc @@ -0,0 +1,110 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "favorite_number", + "type": [ + "int", + "null" + ] + }, + { + "name": "signature", + "type": { + "type": "fixed", + "name": "Signature", + "size": 4 + } + }, + { + "name": "type", + "type": { + "type": "enum", + "name": "Type", + "symbols": [ + "type1", + "type2", + "type3" + ] + } + }, + { + "name": "union", + "type": ["string", "long"] + }, + + { + "name": "info", + "type": { + "type": "record", + "name": "Info", + "fields": [ + { + "name": "age", + "type": "int" + }, + { + "name": "height", + "type": "int" + } + ] + } + }, + { + "name": "documents", + "type": { + "type": "map", + "values": { + "type": "record", + "name": "Document", + "fields": [ + { + "name": "doc_id", + "type": "string" + }, + { + "name": "doc_type", + "type": "string" + } + ] + } + } + }, + { + "name": "preferences", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "addresses", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Address", + "fields": [ + { + "name": "zipcode", + "type": "string" + } + ] + } + } + }, + { + "name": "friends", + "type": { + "type": "array", + "items": "string" + } + } + ] +} \ No newline at end of file diff --git a/src/main/java/com/lightstreamer/kafka_connector/adapters/ConnectorMetadataAdapter.java b/src/main/java/com/lightstreamer/kafka_connector/adapters/ConnectorMetadataAdapter.java index 6679e02d..776a338c 100644 --- a/src/main/java/com/lightstreamer/kafka_connector/adapters/ConnectorMetadataAdapter.java +++ b/src/main/java/com/lightstreamer/kafka_connector/adapters/ConnectorMetadataAdapter.java @@ -25,9 +25,9 @@ public final class ConnectorMetadataAdapter extends KafkaConnectorMetadataAdapter { @Override - public void newTables(String user, String sessionID, TableInfo[] tables) + public void onUnsubscription(String user, String sessionID, TableInfo[] tables) throws CreditsException, NotificationException {} @Override - public void tablesClosed(String sessionID, TableInfo[] tables) throws NotificationException {} + public void onUnsubscription(String sessionID, TableInfo[] tables) throws NotificationException {} } diff --git a/src/main/java/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.java b/src/main/java/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.java index 3983d609..2189f915 100644 --- a/src/main/java/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.java +++ b/src/main/java/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.java @@ -62,6 +62,10 @@ public final void init(@SuppressWarnings("rawtypes") Map params, File configDir) globalConfig = GlobalConfig.newConfig(configDir, params); configureLogging(configDir); METADATA_ADAPTER = this; + doInit(params, configDir); + } + + protected void doInit(Map params, File configDir) throws MetadataProviderException { } // Used only for unit testing. @@ -116,7 +120,7 @@ public void notifyNewTables( registerConnectorItems(sessionID, table, lookUp.get()); } - newTables(user, sessionID, tables); + onUnsubscription(user, sessionID, tables); } private void registerConnectorItems( @@ -157,7 +161,7 @@ public final void notifyTablesClose(@Nonnull String sessionID, @Nonnull TableInf } } - tablesClosed(sessionID, tables); + onUnsubscription(sessionID, tables); } private void notifyDataAdapter(String connectionName, boolean enabled) { @@ -168,11 +172,11 @@ public final Optional lookUp(String connectionName) { return Optional.ofNullable(registeredDataAdapters.get(connectionName)); } - public abstract void newTables( + public abstract void onUnsubscription( @Nullable String user, @Nonnull String sessionID, @Nonnull TableInfo[] tables) throws CreditsException, NotificationException; - public abstract void tablesClosed(@Nonnull String sessionID, @Nonnull TableInfo[] tables) + public abstract void onUnsubscription(@Nonnull String sessionID, @Nonnull TableInfo[] tables) throws NotificationException; public static record ConnectionInfo(String name, boolean enabled) {} diff --git a/src/test/java/com/lightstreamer/kafka_connector/adapters/ConnectorAdapterSetTest.java b/src/test/java/com/lightstreamer/kafka_connector/adapters/ConnectorAdapterSetTest.java index 1b9c2a63..ada07b55 100644 --- a/src/test/java/com/lightstreamer/kafka_connector/adapters/ConnectorAdapterSetTest.java +++ b/src/test/java/com/lightstreamer/kafka_connector/adapters/ConnectorAdapterSetTest.java @@ -192,13 +192,13 @@ class CustomAdapter extends KafkaConnectorMetadataAdapter { NotifyedCloseTables closedTables; @Override - public void newTables(String user, String sessionID, TableInfo[] tables) + public void onUnsubscription(String user, String sessionID, TableInfo[] tables) throws CreditsException, NotificationException { newTables = new NotifyedNewTables(user, sessionID, tables); } @Override - public void tablesClosed(String sessionID, TableInfo[] tables) + public void onUnsubscription(String sessionID, TableInfo[] tables) throws NotificationException { closedTables = new NotifyedCloseTables(sessionID, tables); } diff --git a/src/test/java/com/lightstreamer/kafka_connector/adapters/ConsumerLoopConfiguratorTest.java b/src/test/java/com/lightstreamer/kafka_connector/adapters/ConsumerLoopConfiguratorTest.java index c191ce30..1003a514 100644 --- a/src/test/java/com/lightstreamer/kafka_connector/adapters/ConsumerLoopConfiguratorTest.java +++ b/src/test/java/com/lightstreamer/kafka_connector/adapters/ConsumerLoopConfiguratorTest.java @@ -29,6 +29,7 @@ import com.lightstreamer.kafka_connector.adapters.mapping.selectors.Schema; import com.lightstreamer.kafka_connector.adapters.mapping.selectors.Selectors; import com.lightstreamer.kafka_connector.adapters.mapping.selectors.json.JsonNodeDeserializer; +import com.lightstreamer.kafka_connector.adapters.test_utils.ConnectorConfigProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; @@ -70,10 +71,9 @@ private Map basicParameters() { @Test public void shouldNotConfigureDueToInvalidTemplateReference() { - Map updatedConfigs = basicParameters(); - updatedConfigs.put("map.topic1.to", "no-valid-item-template"); - - ConnectorConfig config = newConfig(updatedConfigs); + Map updatedConfigs = Map.of("map.topic1.to", "no-valid-item-template"); + ConnectorConfig config = + ConnectorConfigProvider.minimalWith(adapterDir.toString(), updatedConfigs); ConfigException e = assertThrows( ConfigException.class, @@ -85,10 +85,10 @@ public void shouldNotConfigureDueToInvalidTemplateReference() { @Test public void shouldNotConfigureDueToInvalidFieldMappingExpression() { - Map updatedConfigs = new HashMap<>(basicParameters()); - updatedConfigs.put("field.fieldName1", "VALUE"); + Map updatedConfigs = Map.of("field.fieldName1", "VALUE"); - ConnectorConfig config = newConfig(updatedConfigs); + ConnectorConfig config = + ConnectorConfigProvider.minimalWith(adapterDir.toString(), updatedConfigs); ConfigException e = assertThrows( ConfigException.class, () -> ConsumerLoopConfigurator.configure(config)); @@ -100,7 +100,7 @@ public void shouldNotConfigureDueToInvalidFieldMappingExpression() { @ParameterizedTest @ValueSource(strings = {"VALUE", "#{UNRECOGNIZED}"}) public void shouldNotConfigureDueToInvalidFieldMappingExpressionWithSchema(String expression) { - Map updatedConfigs = new HashMap<>(basicParameters()); + Map updatedConfigs = ConnectorConfigProvider.minimalConfigParams(); updatedConfigs.put(ConnectorConfig.KEY_EVALUATOR_TYPE, "AVRO"); updatedConfigs.put(ConnectorConfig.KEY_EVALUATOR_SCHEMA_PATH, "value.avsc"); updatedConfigs.put("field.fieldName1", expression); @@ -136,7 +136,7 @@ public void shouldNotConfigureDueToInvalidFieldMappingExpressionWithSchema(Strin "item-#{}}" }) public void shouldNotConfigureDueToInvalidItemTemplateExpression(String expression) { - Map updatedConfigs = new HashMap<>(basicParameters()); + Map updatedConfigs = ConnectorConfigProvider.minimalConfigParams(); updatedConfigs.put("item-template.template1", expression); ConnectorConfig config = newConfig(updatedConfigs);