diff --git a/CHANGELOG.md b/CHANGELOG.md index 25fd15c6..3416f2eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [1.0.0] (2024-08-20) + +- First official public release + ## [0.1.0] (2024-03-17) -- First public pre-release. +- First public pre-release diff --git a/README.md b/README.md index 76fde18c..f20084a0 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,12 @@ _Extend Kafka topics to the web effortlessly. Stream real-time data to mobile an - [Quick Start Schema Registry Example](#quick-start-schema-registry-example) - [Customize the Kafka Connector Metadata Adapter Class](#customize-the-kafka-connector-metadata-adapter-class) - [Develop the Extension](#develop-the-extension) +- [Kafka Lightstreamer Sink Connector](#kafka-connect-lightstreamer-sink-connector) + - [Usage](#usage) + - [Lightstreamer Setup](#lightstreamer-setup) + - [Running](#running) + - [Running in Docker](#running-in-docker) + - [Configuration Reference](#configuration-reference) - [Docs](#docs) - [Examples](#examples) @@ -62,7 +68,7 @@ To efficiently showcase the functionalities of Lightstreamer Kafka Connector, we ![Quickstart Diagram](pictures/quickstart-diagram.png) -The diagram above illustrates how in this setup, a stream of simulated market events is channeled from Kafka to the web client via Lightstreamer Kafka Connector. +The diagram above illustrates how, in this setup, a stream of simulated market events is channeled from Kafka to the web client via Lightstreamer Kafka Connector. To provide a complete stack, the app is based on _Docker Compose_. The [Docker Compose file](examples/quickstart/docker-compose.yml) comprises the following services: @@ -105,21 +111,26 @@ This section will guide you through the installation of Kafka Connector to get i ### Requirements -- JDK version 17 or later. -- [Lightstreamer Server](https://lightstreamer.com/download/) version 7.4.2 or later (check the `LS_HOME/GETTING_STARTED.TXT` file for the instructions). -- A running Kafka broker or Kafka Cluster. +- JDK version 17 or later +- [Lightstreamer Server](https://lightstreamer.com/download/) version 7.4.2 or later (check the `LS_HOME/GETTING_STARTED.TXT` file for the instructions) +- A running Kafka broker or Kafka Cluster ### Deploy -Get the deployment package from the [latest release page](https://github.com/Lightstreamer/Lightstreamer-kafka-connector/releases/latest). Alternatively, check out this repository and run the following command from the [`kafka-connector-project`](kafka-connector-project/) folder: +Download the deployment archive `lightstreamer-kafka-connector-.zip` from the [Releases](https://github.com/Lightstreamer/Lightstreamer-kafka-connector/releases/) page. Alternatively, check out this repository and execute the following command from the [`kafka-connector-project`](kafka-connector-project/) folder: ```sh -$ ./gradlew distribute +$ ./gradlew adapterDistZip ``` -which generates the `lightstreamer-kafka-connector-.zip` bundle under the `kafka-connector-project/deploy` folder. +which generates the archive file under the `kafka-connector-project/kafka-connector/build/distributions` folder. + +Then, unpack it into the `adapters` folder of the Lightstreamer Server installation: + +```sh +$ unzip lightstreamer-kafka-connector-.zip -d LS_HOME/adapters +``` -Then, unzip it into the `adapters` folder of the Lightstreamer Server installation. Finally, check that the Lightstreamer layout looks like the following: ```sh @@ -146,27 +157,28 @@ Before starting Kafka Connector, you need to properly configure the `LS_HOME/ada To quickly complete the installation and verify the successful integration with Kafka, edit the _data_provider_ block `QuickStart` in the file as follows: -- update the [`bootstrap.servers`](#bootstrapservers) parameter with the connection string of Kafka: +- Update the [`bootstrap.servers`](#bootstrapservers) parameter with the connection string of Kafka: ```xml kafka.connection.string ``` -- optionally customize the `LS_HOME/adapters/lightstreamer-kafka-connector-/log4j.properties` file (the current settings produce the additional `quickstart.log` file) -- configure topic and record mapping: +- Optionally customize the `LS_HOME/adapters/lightstreamer-kafka-connector-/log4j.properties` file (the current settings produce the additional `quickstart.log` file). + +- Configure topic and record mapping. - since a generic Ligthstreamer client needs to subscribe to one or more items to receive real-time updates, Kafka Connector has to offer proper mechanisms to realize the mapping between Kafka topics and Lightstreamer items. + Since a generic Ligthstreamer client needs to subscribe to one or more items to receive real-time updates, Kafka Connector has to offer proper mechanisms to realize the mapping between Kafka topics and Lightstreamer items. The `QuickStart` [factory configuration](kafka-connector-project/dist/adapters.xml#L39) comes with a simple mapping through the following settings: - - an item template: + - An item template: ```xml stock-#{index=KEY} ``` which defines the general format name of the items a client must subscribe to to receive updates from Kafka Connector. The [_extraction expression_](#filtered-record-routing-item-templatetemplate-name) syntax used here - denoted within `#{...}` - permits the clients to specify filtering values to be compared against the actual contents of a Kafka record, evaluated through [_Extraction Keys_](#record-mapping-fieldfieldname) used to extract each part of a record. In this case, the `KEY` predefined constant extracts the key part of Kafka records. - - a topic mapping: + - A topic mapping: ```xml item-template.stock ``` @@ -174,8 +186,8 @@ To quickly complete the installation and verify the successful integration with This configuration instructs Kafka Connector to analyze every single event published to the topic `stocks` and check if it matches against any item subscribed by the client as: - - `stock-[index=1]`: an item with the parameter `index` bound to a record key equal to `1` - - `stock-[index=2]`: an item with the parameter `index` bound to a record key equal to `2` + - `stock-[index=1]`: an item with the `index` parameter bound to a record key equal to `1` + - `stock-[index=2]`: an item with the `index` parameter bound to a record key equal to `2` - ... Kafka Connector will then route the event to all matched items. @@ -248,22 +260,22 @@ where you have to replace `username` and `password` with the credentials generat $ ./start_background.sh ``` -2. Attach a Lightstreamer Consumer. +2. Attach a Lightstreamer consumer. The [`kafka-connector-utils`](kafka-connector-project/kafka-connector-utils) submodule hosts a simple Lightstreamer Java client that can be used to test the consumption of Kafka events from any Kafka topics. Before launching the consumer, you first need to build it from the [`kafka-connector-project`](kafka-connector-project/) folder with the command: ```sh - $ ./gradlew distributeConsumer + $ ./gradlew kafka-connector-utils:build ``` - which generates the `lightstreamer-kafka-connector-utils-consumer-all-.jar` under the `deploy-consumer` folder. + which generates the `lightstreamer-kafka-connector-utils-consumer-all-.jar` file under the `kafka-connector-project/kafka-connector-utils/build/libs` folder. Then, launch it with: ```sh - $ java -jar deploy-consumer/lightstreamer-kafka-connector-utils-consumer-all-.jar --address http://localhost:8080 --adapter-set KafkaConnector --data-adapter QuickStart --items stock-[index=1] --fields ask,bid,min,max + $ java -jar kafka-connector-utils/build/libs/lightstreamer-kafka-connector-utils-consumer-all-.jar --address http://localhost:8080 --adapter-set KafkaConnector --data-adapter QuickStart --items stock-[index=1] --fields ask,bid,min,max ``` As you can see, you have to specify a few parameters: @@ -277,7 +289,7 @@ where you have to replace `username` and `password` with the credentials generat > [!NOTE] > While we've provided examples in JavaScript (suitable for web browsers) and Java (geared towards desktop applications), you are encouraged to utilize any of the [Lightstreamer client SDKs](https://lightstreamer.com/download/#client-sdks) for developing clients in other environments, including iOS, Android, Python, and more. -3. Publish Events. +3. Publish the events. The [`examples/quickstart-producer`](examples/quickstart-producer/) folder hosts a simple Kafka producer to publish simulated market events for the _QuickStart_ app. @@ -285,15 +297,15 @@ where you have to replace `username` and `password` with the credentials generat ```sh $ cd examples/quickstart-producer - $ ./gradlew distribute + $ ./gradlew build ``` - which generates the `quickstart-producer-all` under the `deploy` folder. + which generates the `quickstart-producer-all.jar` file under the `build/libs` folder. Then, launch it with: ```sh - $ java -jar deploy/quickstart-producer-all.jar --bootstrap-servers --topic stocks + $ java -jar build/libs/quickstart-producer-all.jar --bootstrap-servers --topic stocks ``` ![producer_video](pictures/producer.gif) @@ -312,7 +324,7 @@ where you have to replace `username` and `password` with the credentials generat where you have to replace `` and `` with the _API Key_ and _secret_ generated on the _Confluent CLI_ or from the _Confluent Cloud Console_. ```sh - $ java -jar deploy/quickstart-producer-all.jar --bootstrap-servers --topic stocks --config-file + $ java -jar build/libs/quickstart-producer-all.jar --bootstrap-servers --topic stocks --config-file ``` #### Publishing with Redpanda Cloud @@ -330,10 +342,10 @@ where you have to replace `username` and `password` with the credentials generat where you have to replace `username` and `password` with the credentials generated from the _Redpanda Console_, and specify the configured SASL mechanism (`SCRAM-SHA-256` or `SCRAM-SHA-512`). ```sh - $ java -jar deploy/quickstart-producer-all.jar --bootstrap-servers --topic stocks --config-file + $ java -jar build/libs/quickstart-producer-all.jar --bootstrap-servers --topic stocks --config-file ``` -4. Check Consumed Events. +4. Check the consumed events. After starting the publisher, you should immediately see the real-time updates flowing from the consumer shell: @@ -363,11 +375,11 @@ The following sections will guide you through the configuration details. _Mandatory_. The `adapter_class` tag, specified inside the _metadata_provider_ block, defines the Java class name of the Metadata Adapter. -The factory value is set to `com.lightstreamer.kafka_connector.adapters.pub.KafkaConnectorMetadataAdapter`, which implements the internal business of Kafka Connector. +The factory value is set to `com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter`, which implements the internal business of Kafka Connector. It is possible to provide a custom implementation by extending this class: just package your new class in a jar file and deploy it along with all required dependencies into the `LS_HOME/adapters/lightstreamer-kafka-connector-/lib` folder. -See the section [Customize the Kafka Connector Metadata Class](#customize-the-kafkaconnector-metadata-adapter-class) for more details. +See the [Customize the Kafka Connector Metadata Class](#customize-the-kafkaconnector-metadata-adapter-class) section for more details. Example: @@ -441,7 +453,7 @@ Default value: `DEFAULT`, but only one `DEFAULT` configuration is permitted. _Mandatory_. The `adapter_class` tag defines the Java class name of the Data Adapter. DO NOT EDIT IT!. -Factory value: `com.lightstreamer.kafka_connector.adapters.KafkaConnectorAdapter`. +Factory value: `com.lightstreamer.kafka.adapters.KafkaConnectorDataAdapter`. ##### `enable` @@ -785,14 +797,13 @@ and other scalar types (see [the complete list](#recordkeyevaluatortype-and-reco In particular, Kafka Connector supports message validation for _Avro_ and _JSON_, which can be specified through: -- local schema files -- the _Confluent Schema Registry_ +- Local schema files +- The _Confluent Schema Registry_ -Kafka Connector allows independent deserialization of keys and values, which means that: +Kafka Connector enables the independent deserialization of keys and values, allowing them to have different formats. Additionally: -- key and value can have different formats -- message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (through [`record.key.evaluator.schema.registry.enable` and `record.value.evaluator.schema.registry.enable`](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable)) -- message validation against local schema files must be specified separately for key and value (through [`record.key.evaluator.schema.path` and `record.value.evaluator.schema.path`](#recordkeyevaluatorschemapath-and-recordvalueevaluatorschemapath)) +- Message validation against the Confluent Schema Registry can be enabled separately for the key and value (through [`record.key.evaluator.schema.registry.enable` and `record.value.evaluator.schema.registry.enable`](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable)) +- Message validation against local schema files must be specified separately for the key and the value (through [`record.key.evaluator.schema.path` and `record.value.evaluator.schema.path`](#recordkeyevaluatorschemapath-and-recordvalueevaluatorschemapath)) > [!IMPORTANT] > For Avro, schema validation is mandatory, therefore either a local schema file must be provided or the Confluent Schema Registry must be enabled. @@ -843,7 +854,7 @@ Examples: ##### `record.key.evaluator.schema.path` and `record.value.evaluator.schema.path` -_Mandatory if [evaluator type](#recordkeyevaluatortype-and-recordvalueevaluatortype) is `AVRO`_ and the [Confluent Schema Registry](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable) is disabled. The path of the local schema file relative to the deployment folder (`LS_HOME/adapters/lightstreamer-kafka-connector-`) for message validation respectively of the Kafka key and the Kafka value. +_Mandatory if [evaluator type](#recordkeyevaluatortype-and-recordvalueevaluatortype) is `AVRO`_ and the [Confluent Schema Registry](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable) is disabled. The path of the local schema file relative to the deployment folder (`LS_HOME/adapters/lightstreamer-kafka-connector-`) for message validation respectively of the key and the value. Examples: @@ -854,7 +865,7 @@ Examples: ##### `record.key.evaluator.schema.registry.enable` and `record.value.evaluator.schema.registry.enable` -_Mandatory if [evaluator type](#recordkeyevaluatortype-and-recordvalueevaluatortype) is `AVRO` and no [local schema paths](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable) are specified_. Enable the use of the [Confluent Schema Registry](#schema-registry) for validation respectively of the Kafka key and the Kafka value. Can be one of the following: +_Mandatory if [evaluator type](#recordkeyevaluatortype-and-recordvalueevaluatortype) is `AVRO` and no [local schema paths](#recordkeyevaluatorschemaregistryenable-and-recordvalueevaluatorschemaregistryenable) are specified_. Enable the use of the [Confluent Schema Registry](#schema-registry) for validation respectively of the key and the value. Can be one of the following: - `true` - `false` @@ -887,12 +898,12 @@ Example: Kafka Connector allows the configuration of several routing and mapping strategies, thus enabling the convey of Kafka events streams to a potentially huge amount of devices connected to Lightstreamer with great flexibility. As anticipated in the [_Installation_](#configure) section, a Kafka record can be analyzed in all its aspects to extract information that can be: -- routed to the designated Lightstreamer items -- remapped to specific Lightstreamer fields +- Routed to the designated Lightstreamer items +- Remapped to specific Lightstreamer fields -##### Record Routing (`map..to`) +##### Record Routing (`map..to`) -To configure the routing of Kafka event streams to Lightstreamer items, use at least one parameter `map..to`. The general format is: +To configure the routing of Kafka event streams to Lightstreamer items, use at least one `map..to` parameter. The general format is: ```xml item1,item2,itemN,... @@ -936,7 +947,7 @@ This configuration enables the implementation of various mapping scenarios, as s With this scenario, it is possible to broadcast to all clients subscribed to a single item (`sample-item`) every message published to different topics (`sample-topic1`, `sample-topic2`, `sample-topic3`). -##### Record Mapping (`field.`) +##### Record Mapping (`field.`) To forward real-time updates to the Lightstreamer clients, a Kafka record must be mapped to Lightstreamer fields, which define the _schema_ of any Lightstreamer item. @@ -945,19 +956,19 @@ To forward real-time updates to the Lightstreamer clients, a Kafka record must b To configure the mapping, you define the set of all subscribable fields through parameters with the prefix `field.`: ```xml -extraction_expression1 -extraction_expression2 +extractionExpression1 +extractionExpression2 ... -extraction_expressionN +extractionExpressionN ... ``` -The configuration specifies that the field `fieldNameX` will contain the value extracted from the deserialized Kafka record through the `extraction_expressionX`. This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer. +The configuration specifies that the field `fieldNameX` will contain the value extracted from the deserialized Kafka record through the `extractionExpressionX`. This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer. To write an extraction expression, Kafka Connector provides the _Data Extraction Language_. This language has a pretty minimal syntax, with the following basic rules: -- expressions must be enclosed within `#{...}` -- expressions use _Extraction Keys_, a set of predefined constants that reference specific parts of the record structure: +- Expressions must be enclosed within `#{...}` +- Expressions use _Extraction Keys_, a set of predefined constants that reference specific parts of the record structure: - `#{KEY}`: the key - `#{VALUE}`: the value @@ -966,7 +977,7 @@ To write an extraction expression, Kafka Connector provides the _Data Extraction - `#{PARTITION}`: the partition - `#{OFFSET}`: the offset -- the _dot notation_ is used to access attributes or fields of record keys and record values serialized in JSON or Avro formats: +- Expressions use the _dot notation_ to access attributes or fields of record keys and record values serialized in JSON or Avro formats: ```js KEY.attribute1Name.attribute2Name... @@ -975,14 +986,14 @@ To write an extraction expression, Kafka Connector provides the _Data Extraction > [!IMPORTANT] > Currently, it is required that the top-level element of either a record key or record value is: - > - an [Object](https://www.json.org/json-en.html), in the case of JSON format - > - a [Record](https://avro.apache.org/docs/1.11.1/specification/#schema-record), in the case of Avro format + > - An [Object](https://www.json.org/json-en.html), in the case of JSON format + > - A [Record](https://avro.apache.org/docs/1.11.1/specification/#schema-record), in the case of Avro format > > Such a constraint may be removed in a future version of Kafka Connector. -- the _square notation_ is used to access: +- Expressions use the _square notation_ to access: - - indexed attributes: + - Indexed attributes: ```js KEY.attribute1Name[i].attribute2Name... @@ -990,7 +1001,7 @@ To write an extraction expression, Kafka Connector provides the _Data Extraction ``` where `i` is a 0-indexed value. - - key-based attributes: + - Key-based attributes: ```js KEY.attribute1Name['keyName'].attribute2Name... @@ -999,16 +1010,16 @@ To write an extraction expression, Kafka Connector provides the _Data Extraction where `keyName` is a string value. > [!TIP] - > For JSON format, accessing a child attribute by dot notation or square bracket notation is equivalent: + > For JSON format, accessing a child attribute using either dot notation or square bracket notation is equivalent: > > ```js > VALUE.myProperty.myChild.childProperty - > ``` - > ```js > VALUE.myProperty['myChild'].childProperty > ``` -- expressions must evaluate to a _scalar_ value, otherwise an error will be thrown during the extraction process. The error will be handled as per the [configured strategy](#recordextractionerrorstrategy). +- Expressions must evaluate to a _scalar_ value + + In case of non-scalar value, an error will be thrown during the extraction process and handled as per the [configured strategy](#recordextractionerrorstrategy). The `QuickStart` [factory configuration](kafka-connector-project/kafka-connector/src/connector/dist/adapters.xml#L353) shows a basic example, where a simple _direct_ mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the same name. Of course, thanks to the _Data Extraction Language_, more complex mapping can be employed. @@ -1034,18 +1045,18 @@ The `QuickStart` [factory configuration](kafka-connector-project/kafka-connector ##### Filtered Record Routing (`item-template.`) Besides mapping topics to statically predefined items, Kafka Connector allows you to configure the _item templates_, -which specify the rule needed to decide if a message can be forwarded to the items specified by the clients, thus enabling a _filtered routing_. +which specify the rules needed to decide if a message can be forwarded to the items specified by the clients, thus enabling a _filtered routing_. The item template leverages the _Data Extraction Language_ to extract data from Kafka records and match them against the _parameterized_ subscribed items. ![filtered-routing](pictures/filtered-routing.png) -To configure an item template, use the parameter `item-template.`: +To configure an item template, use the `item-template.` parameter: ```xml - ``` -and then configure the routing by referencing the template through the parameter `map..to`: +Then, map one (or more) topic to the template by referecing it in the `map..to` parameter: ```xml item-template. @@ -1063,10 +1074,10 @@ The item template is made of: - ``: a sequence of _extraction expressions_, which define filtering rules specified as: ```js - #{paramName1=,paramName2=,...} + #{paramName1=,paramName2=,...} ``` - where `paramNameX` is a _bind parameter_ to be specified by the clients and whose actual value will be extracted from the deserialized Kafka record by evaluating the `` expression (written using the _Data Extraction Language_). + where `paramNameX` is a _bind parameter_ to be specified by the clients and whose actual value will be extracted from the deserialized Kafka record by evaluating the `` expression (written using the _Data Extraction Language_). To activate the filtered routing, the Lightstreamer clients must subscribe to a parameterized item that specifies a filtering value for every bind parameter defined in the template: @@ -1094,15 +1105,14 @@ Consider the following configuration: item-template.by-name,item-template.by-age ``` -which specifies how to route records published from the topic `user` to item templates defined to extract some personal data. +which specifies how to route records published from the topic `user` to the item templates defined to extract some personal data. Let's suppose we have three different Lightstreamer clients: -1. _Client A_ subscribes to: - - the parameterized item _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk` - - the parameterized item _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user -2. _Client B_ subscribes to: - - the parameterized item _SB1_ `user-[firstName=Montgomery,lastName=Scotty]` for receiving real-time updates relative to the user `Montgomery Scotty` +1. _Client A_ subscribes to the following parameterized items: + - _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk` + - _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user +2. _Client B_ subscribes to the parameterized item _SB1_ `user-[firstName=Montgomery,lastName=Scotty]` for receiving real-time updates relative to the user `Montgomery Scotty`. 3. _Client C_ subscribes to the parameterized item _SC1_ `user-[age=37]` for receiving real-time updates relative to any 37 year-old user. Now, let's see how filtered routing works for the following incoming Kafka records published to the topic `user`: @@ -1224,13 +1234,13 @@ Check out the [adapters.xml](examples/quickstart-schema-registry/adapters.xml#L5 ## Customize the Kafka Connector Metadata Adapter Class -If you have any specific need to customize the _Kafka Connector Metadata Adapter_ class, you can provide your implementation by extending the factory class [`com.lightstreamer.kafka_connector.adapters.pub.KafkaConnectorMetadataAdapter`](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.html). The class provides the following hook methods, which you can override to add your custom logic: +If you have any specific need to customize the _Kafka Connector Metadata Adapter_ class, you can provide your implementation by extending the factory class [`com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter`](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html). The class provides the following hook methods, which you can override to add your custom logic: -- [_postInit_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.html#postInit(java.util.Map,java.io.File)): invoked after the initialization phase of the Kafka Connector Metadata Adapter has been completed +- [_postInit_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html#postInit(java.util.Map,java.io.File)): invoked after the initialization phase of the Kafka Connector Metadata Adapter has been completed -- [_onSubscription_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.html#onSubscription(java.lang.String,java.lang.String,com.lightstreamer.interfaces.metadata.TableInfo%5B%5D)): invoked to notify that a user has submitted a subscription +- [_onSubscription_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html#onSubscription(java.lang.String,java.lang.String,com.lightstreamer.interfaces.metadata.TableInfo%5B%5D)): invoked to notify that a user has submitted a subscription -- [_onUnsubcription_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka_connector/adapters/pub/KafkaConnectorMetadataAdapter.html#onUnsubscription(java.lang.String,com.lightstreamer.interfaces.metadata.TableInfo%5B%5D)): invoked to notify that a Subscription has been removed +- [_onUnsubcription_](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html#onUnsubscription(java.lang.String,com.lightstreamer.interfaces.metadata.TableInfo%5B%5D)): invoked to notify that a Subscription has been removed ### Develop the Extension @@ -1240,24 +1250,25 @@ For a Maven project, add the dependency to your _pom.xml_ file: ```xml - com.lightstreamer.kafka-connector + com.lightstreamer.kafka kafka-connector - 0.1.0 + VERSION ``` + and follow these [instructions](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-apache-maven-registry#authenticating-to-github-packages) to configure the repository and authentication. For a Gradle project, edit your _build.gradle_ file as follows: -1. add the dependency: +1. Add the dependency: ```groovy dependencies { - implementation group: 'com.lightstreamer.kafka_connector', name: 'kafka-connector', 'version': '0.1.0' + implementation group: 'com.lightstreamer.kafka', name: 'kafka-connector', 'version': '' } ``` -2. add the repository and specify your personal access token: +2. Add the repository and specify your personal access token: ```grrovy repositories { @@ -1275,10 +1286,366 @@ For a Gradle project, edit your _build.gradle_ file as follows: In the [examples/custom-kafka-connector-adapter](examples/custom-kafka-connector-adapter/) folder, you can find a sample Gradle project you may use as a starting point to build and deploy your custom extension. +## Kafka Connect Lightstreamer Sink Connector + +Lightstreamer Kafka Connector is also available as _Sink Connector plugin_ to be installed into _Kafka Connect_. + +In this scenario, an instance of the connector plugin acts as a [_Remote Adapter_](https://github.com/Lightstreamer/Lightstreamer-lib-adapter-java-remote) for the Lightstreamer server as depicted in the following picture: + +![KafkaConnectArchitecture](pictures/kafka-connect.png) + +### Usage + +#### Lightstreamer Setup + +Before running the connector, you first need to deploy a Proxy Adapter into the Lightstreamer server instance: + +1. Create a directory within `LS_HOME/adapters` (choose whatever name you prefer, for example `kafka-connect-proxy`). + +2. Copy the sample [`adapters.xml`](./kafka-connector-project/config/kafka-connect-proxy/adapters.xml) file to the `kafka-connect-proxy` directory. + +3. Edit the file as follows: + + - Update the `id` attribute of the `adapters_conf` root tag. This settings has the same role of the already documented [Kafka Connector Identifier](#adapter_confid---kafka-connector-identifier). + + - Update the `name` attribute of the data_provider tag. This settings has the same role of the already documented [Kafka Connection Name](#data_providername---kafka-connection-name). + + - Update the `request_reply_port` parameter with the listening TCP port: + + ```xml + 6661 + ``` + + - If authentication is required: + + - Set the `auth` parameter to `Y`: + + ```xml + Y + ``` + + - Add the following parameters with the selected credential settings: + + ```xml + USERNAME + PASSWORD + ``` + +> [!NOTE] +> As the `id` attribute must be unique across all the Adapter Sets deployed in the same Lighstreamer instance, make sure there is no conflict with any previously installed adapters (for example, the factory [adapters.xml](./kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml) file included in the _Kafka Connector_ distribution package). + +Finally, check that the Lightstreamer layout looks like the following: + +```sh +LS_HOME/ +... +├── adapters +│   ├── kafka-connect-proxy +│   │   ├── adapters.xml +│   └── welcome_res +... +├── audit +├── bin +... +``` +#### Running + +To manually install Kafka Connect Lighstreamer Sink Connector to a local Confluent Platform and run it in [_standalone mode_](https://docs.confluent.io/platform/current/connect/userguide.html#standalone-mode): + +1. Download the connector zip file `lightstreamer-kafka-connect-lightstreamer-.zip` from the [Releases](https://github.com/Lightstreamer/Lightstreamer-kafka-connector/releases) page. Alternatively, check out this repository and execute the following command from the [`kafka-connector-project`](kafka-connector-project/) folder: + + ```sh + $ ./gradlew connectDistZip + ``` + + which generates the zip file under the `kafka-connector-project/kafka-connector/build/distributions` folder. + +2. Extract the zip file into the desired location. + + For example, you can copy the connector contents into a new directory named `CONFLUENT_HOME/share/kafka/plugins`. + +3. Edit the worker configuration properties file, ensuring you include the previous path in the `plugin.path` properties, for example: + + ``` + plugins.path=/usr/local/share/kafka/plugins + ``` + + You may want to use the provided [connect-standalone-local.properties](./kafka-connector-project/config/kafka-connect-config/connect-standalone-local.properties) file as a starting point. + +3. Edit the connector configuration properties file as detailed in the [Configuration Reference](#configuration-reference) section. + + You may want to use the provided [`quickstart-lightstreamer-local.properties`](./kafka-connector-project/config/kafka-connect-config/connect-standalone-local.properties) file as starting pint. This file provides the set of pre-configured settings to feed Lighstreamer with stock market events, as already shown in the [installation instruction](#installation) for the Lightstreamer Kafka Connector. + +4. Launch the Lightstreamer Server instance already configured in the [Lightstreamer Setup](#lightstreamer-setup) section. + +5. Start the Connect worker with: + + ```sh + $ bin/connect-standalone.sh connect-standalone-local.properties quickstart-lightstreamer-local.properties + ``` + +To verify that an events stream actually flows from Kafka to a Lighstreamer consumer leveraging the same example already shwon in the [Start](#start) section: + +1. Attach a Lighstreamer consumer as specified in the step 2 of the [Start](#start) section. + +2. Make sure that a Schema Registy service is reachable from your local machine. + +3. Edit a `producer.properties` file as follows: + + ``` + # JSON deserializer with support for the Schema Registry + value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer + # Schema Registry URL + schema.registry.url=http://: + ``` + + This configuration enables the producer to leverage the Schema Registry, which is required by Kafka Connect when a connector wants to deserialize JSON messages (unless an embedded schema is provided). + +2. Publish events as specified in the step 3 of the [Start](#start) section. + + This time, run the publisher passing as further argument the `producer.properties` file: + + ```sh + $ java -jar examples/quickstart-producer/build/libs/quickstart-producer-all.jar --bootstrap-servers --topic stocks --confg-file producer.properties + ``` + +3. Check the consumed events. + + You shouls see real-time updated as shown in the step 4 of the [Start](#start) section. + +#### Running in Docker + +If you want to build a local Docker image based on Kafka Connect with the connector plugin, check out the [exmaples/docker-kafka-connect](./examples/docker-kafka-connect/) folder. + +In addition, the [examples/quickstart-kafka-connect](./examples/quickstart-kafka-connect/) folder shows how to use that image in Docker Compose through a Kafka Connect version of the _Quick Start_ app. + +### Configuration Reference + +The Kafka Connect Lightstreamer Sink Connector configuration properties are described below. + +#### `connector.class` + +To use the connector, specify the following setting: +`connector.class=com.lightstreamer.kafka.connect.LightstreamerSinkConnector` + +#### lightstreamer.server.proxy_adapter.address + +The Lightstreamer server's Proxy Adapter address to connect to in the format **`host:port`**. + +- **Type:** string +- **Default:** none +- **Importance:** high + +Example: + +``` +lightstreamer.server.proxy_adapter.address=lighstreamer.com:6661 +``` + +#### lightstreamer.server.proxy_adapter.socket.connection.setup.timeout.ms + +The (optional) amount of time in milliseconds the connctor will wait for the socket connection to be established to the Lighstreamer server's Proxy Adapter before terminating the task. Specify `0` for infinite timeout. + +- **Type:** int +- **Default:** 5000 (5 seconds) +- **Valid Values:** [0,...] +- **Importance:** low + +Example: + +``` +lightstreamer.server.proxy_adapter.socket.connection.setup.timeout.ms=15000 +``` + +#### lightstreamer.server.proxy_adapter.socket.connection.setup.max.retries + +The (optional) max number of retries to establish a connection to the Lighstreamer server's Proxy Adapter. + +- **Type:** int +- **Default:** 1 +- **Valid Values:** [0,...] +- **Importance:** medium + +Example: + +``` +lightstreamer.server.proxy_adapter.socket.connection.setup.max.retries=5 +``` + +#### lightstreamer.server.proxy_adapter.socket.connection.setup.retry.delay.ms + +The (optional) amount of time in milliseconds to wait before retrying to establish a new connection to the Lighstreamer server's Proxy Adapter in case of failure. Only applicable if +`lightstreamer.server.proxy_adapter.socket.connection.setup.max.retries` > 0. + +- **Type:** long +- **Default:** 5000 (5 seconds) +- **Valid Values:** [0,...] +- **Importance:** low + +Example: + +``` +lightstreamer.server.proxy_adapter.socket.connection.setup.retry.delay.ms=15000 +``` + +#### lightstreamer.server.proxy_adapter.username + +The username to use for authenticating to the Lightstreamer server's Proxy Adapter. This setting requires authentication to be enabled in the [configuration](#lightstreamer-setup) of the Proxy Adapter. + +- **Type:** string +- **Importance:** medium +- **Default:** none + +Example: + +``` +lightstreamer.server.proxy_adapter.username=lightstreamer_user +``` + +#### lightstreamer.server.proxy_adapter.password + +The password to use for authenticating to the Lightstreamer server's Proxy Adapter. This setting requires authentication to be enabled in the [configuration](#lightstreamer-setup) of the Proxy Adapter. + +- **Type:** string +- **Default:** none +- **Importance:** medium + +Example: + ``` + lightstreamer.server.proxy_adapter.password=lightstreamer_password + ``` + +#### record.extraction.error.strategy + +The (optional) error handling strategy to be used if an error occurs while extracting data from incoming deserialized records. Can be one of the following: + +- `TERMINATE_TASK`: terminate the task immediately +- `IGNORE_AND_CONTINUE`: ignore the error and continue to process the next record +- `FORWARD_TO_DLQ`: forward the record to the dead letter queue + +In particular, the `FORWARD_TO_DLQ` value requires a [_dead letter queue_](https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/) to be configured; otherwise it will fallback to `TERMINATE_TASK`. + + +- **Type:** string +- **Default:** `TERMINATE_TASK` +- **Valid Values:** [`IGNORE_AND_CONTINUE`, `FORWARD_TO_DLQ`, `TERMINATE_TASK`] +- **Importance:** medium + +Example: + +``` +record.extraction.error.strategy=FORWARD_TO_DLQ +``` + +#### topic.mappings + +> [!IMPORTANT] +> This configuration implements the same concepts already presented in the [Record Routing](#record-routing-maptopicto) section. + +Semicolon-separated list of mappings between source topics and Lightstreamer items. The list should describe a set of mappings in the form: + +`[topicName1]:[mappingList1];[topicName2]:[mappingList2];...[topicNameN]:[mappingListN]` + +where every specified topic (`[topicNameX]`) is mapped to the item names or item templates specified as comma-separated list (`[mappingListX]`). + +- **Type:** string +- **Default:** none +- **Valid Values:**
+ [topicName1]:[mappingList1];
+ [topicName2]:[mappingList2];... +- **Importance:** high + +Example: + +``` +topic.mappings=sample-topic:item-template.template1,item1,item2;order-topic:order-item +``` + +The configuration above specifes: + +- A _One To Many_ mapping between the topic `sample-topic` and the Lightstreamer items `sample-item1`, `sample-item2`, and `sample-item3` +- [_Filtered routing_](#filtered-record-routing-item-templatetemplate-name) through the reference to the item template `template1` (not shown in the snippet) +- A _One To One_ mapping between the topic `order-topic` and the Lightstreamer item `order-item` + +#### record.mappings + +> [!IMPORTANT] +> This configuration implements the same concepts already presented in the [Record Mapping](#record-mapping-fieldfieldname) section. + +The list of mapping between Kafa records and Ligtstreamer fields. The list should describe a set of subscribable fields in the following form: + + `[fieldName1]:[extractionExpression1],[fieldName2]:[extractionExpressionN],...,[fieldNameN]:[extractionExpressionN]` + +where the Lightstreamer field `[fieldNameX]` whill hold the data extracted from a deserialized Kafka record using the +_Data Extraction Language_ `[extractionExpressionX]`. + +- **Type:** list +- **Default:** none +- **Valid Values:**
+ [fieldName1]:[extractionExpression1],
+ [fieldName2]:[extractionExpression2],... +- **Importance:** high + +Example: + +``` +record.mappings=index:#{KEY.}, \ + stock_name:#{VALUE.name}, \ + last_price:#{VALUE.last_price} +``` + +The configuration above specifies the following mappings: + +1. The record key to the Lightstreamer field `index` +2. The `name` attribute of the record value to the Lightstreamer field `stock_name` +3. The `last_price` of the record value to the Lightstreamer field `last_price` + + +#### item.templates + +> [!IMPORTANT] +> This configuration implements the same concepts already presented in the [Filtered Routing](#filtered-record-routing-item-templatetemplate-name) section. + +Semicolon-separated list of _item templates_, which specify the rules to enable the _filtering routing_. The list should describe a set of templates in the following form: + +`[templateName1]:[template1];[templateName2]:[template2];...;[templateNameN]:[templateN]` + +where the `[templateX]` configures the item template `[templaeName]` defining the general format of the items the Lightstremer clients must subscribe to to receive udpdates. + +A template is specified in the form: + +``` +item-prefix-#{paramName1=extractionExpression1,paramName2=extractionExpression2,...} +``` + +To map a topic to an item template, reference it using the `item-template` prefix in the `topic.mappings` configuration: + +``` +topic.mappings=some-topic:item-template.templateName1,item-template.templateName2,... +``` + +- **Type:** string +- **Default:** null +- **Valid Values:**
+ [templateName1]:[template1];
+ [templateName2]:[template2];... +- **Importance:** high + +Example: + +``` +item.templates=by-name:user-#{firstName=VALUE.name,lastName=VALUE.surname}; \ + by-age:user-#{years=VALUE.age} + +topic.mappings=user:item-template.by-name,item-template.by-age +``` + +The configuration above specifies how to route records published from the topic `user` to the item templates `by-name` and `by-age`, which define the rules to extract some personal data by leverging _Data Extraction Langauge_ expressions. + ## Docs -The [docs](docs/) folder contains the complete [Kafka Connector API Specification](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc), already mentioned in the previous section. +The [docs](docs/) folder contains the complete [Kafka Connector API Specification](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc), already mentioned in the [Develop the Extension](#develop-the-extension) section. ## Examples -The [examples](examples/) folder contains all the examples mentioned throughout this guide. Furthermore, you may take a look at the [_Airport Demo_](examples/airport-demo/), which provides more insights into various usage and configuration options of Kafka Connector. +The [examples](examples/) folder contains all the examples mentioned throughout this guide. Furthermore, you may explore the [_Airport Demo_](examples/airport-demo/) for depper insights into various usage and configuration options of Kafka Connector. diff --git a/docs/javadoc/allclasses-index.html b/docs/javadoc/allclasses-index.html index cd5a1a0a..c04eb8b2 100644 --- a/docs/javadoc/allclasses-index.html +++ b/docs/javadoc/allclasses-index.html @@ -2,7 +2,7 @@ -All Classes and Interfaces (kafka-connector 0.1.0 API) +All Classes and Interfaces (kafka-connector 0.2.0 API) @@ -28,7 +28,7 @@