Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro committed Mar 11, 2024
1 parent 832551a commit b967fc4
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 111 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- [`record.extraction.error.strategy`](#recordextractionerrorstrategy)
- [Topic Mapping](#topic-mapping)
- [Record Routing (`map.<topic>.to`)](#record-routing-maptopicto)
- [Record Mapping (`field.<filedName>`)](#record-mapping-fieldfiledname)
- [Record Mapping (`field.<fieldName>`)](#record-mapping-fieldfieldname)
- [Filtered Record Routing (`item-template.<template-name>`)](#filtered-record-routing-item-templatetemplate-name)
- [Example](#example)
- [Schema Registry](#schema-registry)
Expand Down Expand Up @@ -872,10 +872,10 @@ As anticipated in the [_Installation_](#configure) section, a Kafka record can b
To configure the routing of Kafka event streams to Lightstreamer items, use at least one parameter `map.<topic>.to`. The general format is:

```xml
<param name="map.<topic-name>.to"><item1>,<item2>,<itemN>,...</param>
<param name="map.<topic-name>.to">item1,item2,itemN,...</param>
```

which defines the mapping between the source Kafka topic (`<topic-name>`) and the target items (`<item1>`, `<item2>`, `<itemN>`, etc.).
which defines the mapping between the source Kafka topic (`<topic-name>`) and the target items (`item1`, `item2`, `itemN`, etc.).

This configuration enables the implementation of various mapping scenarios, as shown by the following examples:

Expand Down Expand Up @@ -907,7 +907,7 @@ This configuration enables the implementation of various mapping scenarios, as s

With this scenario, it is possible to broadcast to all clients subscribed to a single item (`sample-item`) every message published to different topics (`sample-topic1`, `sample-topic2`, `sample-topic3`).

##### Record Mapping (`field.<filedName>`)
##### Record Mapping (`field.<fieldName>`)

To forward real-time updates to the Lightstreamer clients, a Kafka record must be mapped to Lightstreamer fields, which define the _schema_ of any Lightstreamer item.

Expand Down
18 changes: 10 additions & 8 deletions examples/quickstart-confluent-cloud/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,34 @@
</metadata_provider>

<data_provider name="QuickStart">
<!-- ##### GENERAL PARAMETERS ##### -->

<adapter_class>com.lightstreamer.kafka_connector.adapters.KafkaConnectorDataAdapter</adapter_class>

<param name="bootstrap.servers">$env.bootstrap_server</param>
<param name="group.id">quick-start-group</param>

<!-- ENCRYPTION SETTINGS -->
<!-- ##### ENCRYPTION SETTINGS ##### -->
<param name="encryption.enable">true</param>
<param name="encryption.protocol">TLSv1.2</param>
<param name="encryption.hostname.verification.enable">true</param>

<!-- AUTHENTICATION SETTINGS -->
<!-- ##### AUTHENTICATION SETTINGS ##### -->
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">PLAIN</param>
<param name="authentication.username">$env.api_key</param>
<param name="authentication.password">$env.secret</param>

<!-- Topics mapping -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.topic_0.to">item-template.stock</param>

<!-- Record consumption settings -->
<!-- ##### RECORD EVALUATION SETTINGS ##### -->
<param name="record.consume.from">EARLIEST</param>
<param name="record.key.evaluator.type">INTEGER</param>
<param name="record.value.evaluator.type">JSON</param>

<!-- Fields mapping settings -->
<!-- ##### RECORD ROUTING SETTINGS ##### -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.topic_0.to">item-template.stock</param>

<!-- ##### RECORD MAPPING SETTINGS ##### -->
<param name="field.ts">#{TIMESTAMP}</param>
<param name="field.topic">#{TOPIC}</param>
<param name="field.offset">#{OFFSET}</param>
Expand Down
33 changes: 18 additions & 15 deletions examples/quickstart-schema-registry/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
</metadata_provider>

<data_provider name="QuickStart">
<!-- ##### GENERAL PARAMETERS ##### -->

<adapter_class>com.lightstreamer.kafka_connector.adapters.KafkaConnectorDataAdapter</adapter_class>

<param name="bootstrap.servers">broker:29094</param>
<param name="group.id">quick-start-group</param>

<!-- ENCRYPTION SETTINGS -->
<!-- ##### ENCRYPTION SETTINGS ##### -->
<param name="encryption.enable">true</param>
<param name="encryption.protocol">TLSv1.3</param>
<param name="encryption.hostname.verification.enable">false</param>
Expand All @@ -22,27 +24,18 @@
<param name="encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
<param name="encryption.keystore.password">kafka-connector-password</param>
<param name="encryption.keystore.key.password">kafka-connector-private-key-password</param>

<!-- Topics mapping -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.stocks.to">item-template.stock</param>

<!-- Record consumption settings -->
<!-- ##### RECORD EVALUATION SETTINGS ##### -->
<param name="record.consume.from">EARLIEST</param>
<param name="record.key.evaluator.type">INTEGER</param>
<param name="record.value.evaluator.type">JSON</param>
<param name="record.value.evaluator.schema.registry.enable">true</param>

<!-- Schema Registry settings -->
<param name="schema.registry.url">https://schema-registry:8084</param>
<param name="schema.registry.encryption.truststore.path">secrets/kafka-connector.truststore.jks</param>
<param name="schema.registry.encryption.truststore.password">kafka-connector-truststore-password</param>
<param name="schema.registry.encryption.keystore.enable">true</param>
<param name="schema.registry.encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
<param name="schema.registry.encryption.keystore.password">kafka-connector-password</param>
<param name="schema.registry.encryption.keystore.key.password">kafka-connector-private-key-password</param>
<!-- ##### RECORD ROUTING SETTINGS ##### -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.stocks.to">item-template.stock</param>

<!-- Fields mapping settings -->
<!-- ##### RECORD MAPPING SETTINGS ##### -->
<param name="field.ts">#{TIMESTAMP}</param>
<param name="field.topic">#{TOPIC}</param>
<param name="field.offset">#{OFFSET}</param>
Expand All @@ -61,6 +54,16 @@
<param name="field.ref_price">#{VALUE.ref_price}</param>
<param name="field.open_price">#{VALUE.open_price}</param>
<param name="field.item_status">#{VALUE.item_status}</param>

<!-- ##### SCHEMA REGISTRY SETTINGS ##### -->
<param name="schema.registry.url">https://schema-registry:8084</param>
<param name="schema.registry.encryption.truststore.path">secrets/kafka-connector.truststore.jks</param>
<param name="schema.registry.encryption.truststore.password">kafka-connector-truststore-password</param>
<param name="schema.registry.encryption.keystore.enable">true</param>
<param name="schema.registry.encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
<param name="schema.registry.encryption.keystore.password">kafka-connector-password</param>
<param name="schema.registry.encryption.keystore.key.password">kafka-connector-private-key-password</param>

</data_provider>

</adapters_conf>
Expand Down
16 changes: 9 additions & 7 deletions examples/quickstart-ssl/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
</metadata_provider>

<data_provider name="QuickStart">
<!-- ##### GENERAL PARAMETERS ##### -->

<adapter_class>com.lightstreamer.kafka_connector.adapters.KafkaConnectorDataAdapter</adapter_class>

<param name="bootstrap.servers">broker:29094</param>
<param name="group.id">quick-start-group</param>

<!-- ENCRYPTION SETTINGS -->
<!-- ##### ENCRYPTION SETTINGS ##### -->
<param name="encryption.enable">true</param>
<param name="encryption.protocol">TLSv1.3</param>
<param name="encryption.hostname.verification.enable">false</param>
Expand All @@ -23,16 +25,16 @@
<param name="encryption.keystore.password">kafka-connector-password</param>
<param name="encryption.keystore.key.password">kafka-connector-private-key-password</param>

<!-- Topics mapping -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.stocks.to">item-template.stock</param>

<!-- Record consumption settings -->
<!-- ##### RECORD EVALUATION SETTINGS ##### -->
<param name="record.consume.from">EARLIEST</param>
<param name="record.key.evaluator.type">INTEGER</param>
<param name="record.value.evaluator.type">JSON</param>

<!-- Fields mapping settings -->
<!-- ##### RECORD ROUTING SETTINGS ##### -->
<param name="item-template.stock">stock-#{index=KEY}</param>
<param name="map.stocks.to">item-template.stock</param>

<!-- ##### RECORD MAPPING SETTINGS ##### -->
<param name="field.ts">#{TIMESTAMP}</param>
<param name="field.topic">#{TOPIC}</param>
<param name="field.offset">#{OFFSET}</param>
Expand Down
92 changes: 46 additions & 46 deletions kafka-connector-samples/build.gradle
Original file line number Diff line number Diff line change
@@ -1,75 +1,75 @@
plugins {
id 'lightstreamer-kafka-connector'
id 'java'
id 'lightstreamer-kafka-connector'
id 'java'
}

sourceSets {
producer {
}
consumer {
}
producer {
}
consumer {
}
}

configurations {
producerImplementation.extendsFrom implementation
consumerImplementation.extendsFrom implementation
producerImplementation.extendsFrom implementation
consumerImplementation.extendsFrom implementation
}

dependencies {
implementation group: 'info.picocli', name:'picocli', version:'4.7.5'
implementation group: 'info.picocli', name:'picocli', version:'4.7.5'

producerImplementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10'
producerImplementation group: 'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs'
producerImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3'
producerImplementation group: 'io.confluent', name: 'kafka-json-serializer', version:'7.5.3'
producerImplementation group: 'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3'
producerImplementation group: 'org.slf4j', name: 'slf4j-reload4j', 'version': '2.0.10'
producerImplementation group: 'org.apache.kafka', name:'kafka-clients', version:'7.5.3-ccs'
producerImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version:'7.5.3'
producerImplementation group: 'io.confluent', name: 'kafka-json-serializer', version:'7.5.3'
producerImplementation group: 'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3'

consumerImplementation group: 'com.lightstreamer', name: 'ls-javase-client', version:'5.0.0'
consumerImplementation group: 'com.lightstreamer', name: 'ls-javase-client', version:'5.0.0'
}


task producerJar(type: Jar) {
manifest {
attributes 'Main-Class': "com.lightstreamer.kafka_connector.samples.producer.Producer"
}
archiveBaseName = rootProject.name + "-samples-producer"
archiveAppendix = 'all'
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from sourceSets.producer.output
from {
sourceSets.producer.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes 'Main-Class': "com.lightstreamer.kafka_connector.samples.producer.Producer"
}
archiveBaseName = rootProject.name + "-samples-producer"
archiveAppendix = 'all'
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from sourceSets.producer.output
from {
sourceSets.producer.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}

task consumerJar(type: Jar) {
manifest {
attributes 'Main-Class': "com.lightstreamer.kafka_connector.samples.consumer.Consumer"
}
archiveBaseName = rootProject.name + "-samples-consumer"
archiveAppendix = 'all'
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from sourceSets.consumer.output
from {
sourceSets.consumer.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes 'Main-Class': "com.lightstreamer.kafka_connector.samples.consumer.Consumer"
}
archiveBaseName = rootProject.name + "-samples-consumer"
archiveAppendix = 'all'
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from sourceSets.consumer.output
from {
sourceSets.consumer.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}

task distribuiteProducer(type: Copy) {
from producerJar
into "$rootDir/$deployDirName"
from producerJar
into "$rootDir/$deployDirName"
}

task distribuiteConsumer(type: Copy) {
from consumerJar
into "$rootDir/$deployDirName"
from consumerJar
into "$rootDir/$deployDirName"
}

task distribuite(type: Copy) {
from producerJar
from consumerJar
into "$rootDir/$deployDirName"
from producerJar
from consumerJar
into "$rootDir/$deployDirName"
}
Loading

0 comments on commit b967fc4

Please sign in to comment.