- Lightstreamer Kafka Connector
- Introduction
- Quick Start
- Installation
- Configuration
- Global Settings
- Connection Settings
- General Parameters
- Encryption Parameters
encryption.enable
encryption.protocol
encryption.enabled.protocols
encryption.cipher.suites
encryption.hostname.verification.enable
encryption.truststore.path
encryption.truststore.password
encryption.keystore.enable
encryption.keystore.path
encryption.keystore.password
encryption.keystore.key.password
- Quick Start SSL Example
- Broker Authentication Parameters
- Record Evaluation
- Topic Mapping
- Schema Registry
- Customize the Kafka Connector Metadata Adapter Class
- Docs
- Examples
Lightstreamer Kafka Connector is a ready-made pluggable Lightstreamer Adapter that enables event streaming from a Kafka broker to the internet.
With Kafka Connector, any internet client connected to Lightstreamer Server can consume events from Kafka topics like any other Kafka client. The Connector takes care of processing records received from Kafka to adapt and route them as real-time updates for the clients.
Kafka Connector allows to move high volume data out of Kafka by leveraging the battle-tested ability of the Lightstreamer real-time engine to deliver live data reliably and efficiently over internet protocols.
To rapidly showcase the functioning of Lightstreamer Kafka Connector, the examples/quickstart
folder hosts all the stuff required to set up a quickstart app to display real-time market data received from Lightstreamer Server. The app is a modified version of the Stock List demo.
As you can see from the diagram above, in this variant the stream of simulated market events is injected from Kafka to the web client through Ligthstreamer Kafka Connector.
To provide a complete stack, the app is based on Docker Compose. The Docker Compose file comprises the following services:
- broker: a Kafka broker, based on the Confluent Local Docker Image
- kafka-connector: Lightstreamer Server with Kafka Connector, based on the Lightstreamer Kafka Connector Docker image example, which also includes a web client mounted on
/lightstreamer/pages/QuickStart
- producer: a native Kafka Producer, based on the provided
Dockerfile.producer
file and the kafka-connector-samples submodule of this repository
-
Make sure you have Docker, Docker Compose, and Java 17 (or later) installed on your local machine.
-
From the
examples/quickstart
folder, run the command:./start.sh ... ⠏ Network quickstart_default Created ✔ Container broker Started ✔ Container producer Started ✔ Container kafka-connector Started ... Service started. Now you can point your browser to http://localhost:8080/QuickStart to see real-time data. ...
-
Once all containers are ready, point your browser to http://localhost:8080/QuickStart.
-
After a few moments, the user interface starts displaying the real-time stock data.
-
To shutdown Docker Compose and clean up all temporary resources:
./stop.sh
This section will guide you through the installation of Kafka Connector to get it up and running in a very short time.
- JDK version 17 or later.
- Lightstreamer Server version 7.4.2 or later (check the
LS_HOME/GETTING_STARTED.TXT
file for the instructions). - A running Kafka broker or Kafka Cluster.
Get the deployment package from the latest release page. Alternatively, check out this repository and run the following command from the kafka-connector-project
folder:
./gradlew distribuite
which generates the lightstreamer-kafka-connector-<version>.zip
bundle under the kafka-connector-project/deploy
folder.
Then, unzip it into the adapters
folder of the Lightstreamer Server installation.
Finally, check that the Lightstreamer layout looks like the following:
LS_HOME/
...
├── adapters
│ ├── lightstreamer-kafka-connector-<version>
│ │ ├── README.md
│ │ ├── adapters.xml
│ │ ├── log4j.properties
│ │ ├── lib
│ └── welcome_res
...
├── audit
├── bin
...
Before starting Kafka Connector, you need to properly configure the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/adapters.xml
file. For convenience, the package comes with a predefined configuration (the same used in the Quick Start app), which can be customized in all its aspects as per your requirements. Of course, you may add as many different connection configurations as desired to fit your needs.
To quickly complete the installation and verify the successful integration with Kafka, edit the data_provider block QuickStart
in the file as follows:
-
update the
bootstrap.servers
parameter with the connection string of Kafka:<param name="bootstrap.servers">kafka.connection.string</param>
-
optionally customize the
LS_HOME/adapters/lightstreamer-kafka-connector-<version>/log4j.properties
file (the current settings produce the additionalquickstart.log
file) -
configure topic and record mapping:
since a generic Ligthstreamer client needs to subscribe to one or more items to receive real-time updates, Kafka Connector has to offer proper mechanisms to realize the mapping between Kafka topics and Lightstreamer items.
The QuickStart factory configuration comes with a simple mapping through the following settings:
-
an item template:
<param name="item-template.stock">stock-#{index=KEY}</param>
which defines the general format name of the items a client must subscribe to to receive updates from Kafka Connector. The bindable extraction expression syntax used here - denoted within
#{...}
- permits the binding of parameters specified in the subscriptions to different sections of a Kafka record. In this case, theKEY
predefined constant - one of the Extraction Keys used to extract each part of a record - is bound to theindex
parameter to extract the record key. -
a topic mapping:
<param name="map.stocks.to">item-template.stock</param>
which maps the topic
stocks
to the provided item template.
This configuration instructs Kafka Connector to analyze every single event published to the topic
stocks
and check if it matches against any item subscribed by the client as:stock-[index=1]
: an item with the parameterindex
bound to a record key equal to1
stock-[index=2]
: an item with the parameterindex
bound to a record key equal to2
- ...
Kafka Connector will then route the event to all matched items.
In addition, the following section defines how to map the record to the tabular form of Lightstreamer fields, by using the aforementioned Extraction Keys. In this case, the
VALUE
predefined constant extracts the value part of Kafka records.<param name="field.stock_name">#{VALUE.name}</param> <param name="field.last_price">#{VALUE.last_price}</param> <param name="field.ask">#{VALUE.ask}</param> <param name="field.ask_quantity">#{VALUE.ask_quantity}</param> <param name="field.bid">#{VALUE.bid}</param> <param name="field.bid_quantity">#{VALUE.bid_quantity}</param> <param name="field.pct_change">#{VALUE.pct_change}</param> <param name="field.min">#{VALUE.min}</param> <param name="field.max">#{VALUE.max}</param> <param name="field.ref_price">#{VALUE.ref_price}</param> <param name="field.open_price">#{VALUE.open_price}</param> <param name="field.item_status">#{VALUE.item_status}</param>
This way, the routed event is transformed into a flat structure, which can be forwarded to the clients.
-
You can get more details about all possible settings in the Configuration section.
If your target Kafka cluster is Confluent Cloud, you also need to properly configure TLS 1.2
encryption and SASL_PLAIN
authentication, as follows:
<param name="encryption.enable">true</param>
<param name="encryption.protocol">TLSv1.2</param>
<param name="encryption.hostname.verification.enable">true</param>
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">PLAIN</param>
<param name="authentication.username">API.key</param>
<param name="authentication.password">secret</param>
...
where you have to replace API.key
and secret
with the API Key and secret generated on the Confluent CLI or from the Confluent Cloud Console.
-
Launch Lightstreamer Server.
From the
LS_HOME/bin/unix-like
directory, run the command:./start_background.sh
-
Attach a Lightstreamer Consumer.
The
kafka-connector-utils
submodule hosts a simple Lightstreamer Java client that can be used to test the consumption of Kafka events from any Kafka topics.Before launching the consumer, you first need to build it from the
kafka-connector-project
folder with the command:./gradlew distribuiteConsumer
which generates the
lightstreamer-kafka-connector-utils-consumer-all-<version>.jar
under thedeploy
folder.Then, launch it with:
java -jar deploy/lightstreamer-kafka-connector-utils-consumer-all-<version>.jar --address http://localhost:8080 --adapter-set KafkaConnector --data-adapter QuickStart --items stock-[index=1] --fields ask,bid,min,max
As you can see, you have to specify a few parameters:
--address
: the Lightstreamer Server address--adapter-set
: the name of the requested Adapter Set, which triggers Ligthtreamer to activate Kafka Connector deployed into theadapters
folder--data-adapter
: the name of the requested Data Adapter, which identifies the selected Kafka connection configuration--items
: the list of items to subscribe to--fields
: the list of requested fields for the items
Note
As Kafka Connector is built around the Lightreamer Java In-Process Adapter SDK, every remote client based on any Lightstreamer Client SDK can therefore interact with it.
-
Publish Events.
The
examples/quickstart-producer
folder hosts a simple Kafka producer to publish simulated market events for the QuickStart app.Before launching the producer, you first need to build it. Open a new shell from the folder and execute the command:
cd examples/quickstart-producer ./gradlew distribuite
which generates the
quickstart-producer-all
under thedeploy
folder.Then, launch it with:
java -jar deploy/quickstart-producer-all.jar --bootstrap-servers <kafka.connection.string> --topic stocks
If your target Kafka cluster is Confluent Cloud, you also need to provide a properties file that includes encryption and authentication settings, as follows:
security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='API.key' password='secret'; sasl.mechanism=PLAIN ...
where you have to replace
API.key
andsecret
with the API Key and secret generated on the Confluent CLI or from the Confluent Cloud Console.java -jar deploy/quickstart-producer-all.jar --bootstrap-servers <kafka.connection.string> --topic stocks --config-file <path/to/config/file>
-
Check Consumed Events.
After starting the publisher, you should immediately see the real-time updates flowing from the consumer shell:
As already anticipated, Kafka Connector is a Lightstreamer Adapter Set, which means it is made up of a Metadata Adapter and one or more Data Adapters, whose settings are defined in the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/adapters.xml
file.
The following sections will guide you through the configuration details.
Mandatory. The id
attribute of the adapters_conf
root tag defines the Kafka Connector identifier, which will be used by the Clients to request this Adapter Set while setting up the connection to a Lightstreamer Server through a LightstreamerClient object.
The factory value is set to KafkaConnector
for convenience, but you are free to change it as per your requirements.
Example:
<adapters_conf id="KafkaConnector">
Mandatory. The adapter_class
tag, specified inside the metadata_provider block, defines the Java class name of the Metadata Adapter.
The factory value is set to com.lightstreamer.kafka_connector.adapters.KafkaConnectorMetadataAdapter
, which implements the internal business of Kafka Connector.
It is possible to provide a custom implementation by extending this class: just package your new class in a jar file and deploy it along with all required dependencies into the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/lib
folder.
See the section Customize the Kafka Connector Metadata Class for more details.
Example:
...
<metadata_provider>
...
<adapter_class>your.custom.class</adapter_class>
...
</metadata_provider>
...
Mandatory. The path of the reload4j configuration file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
The parameter is specified inside the metadata_provider block.
The factory value points to the predefined file LS_HOME/adapters/lightstreamer-kafka-connector-<version>/log4g.properties
.
Example:
...
<metadata_provider>
...
<param name="logging.configuration.path">log4j.properties</param>
...
</metadata_provider>
...
Kafka Connector allows the configuration of separate independent connections to different Kafka brokers/clusters.
Every single connection is configured via the definition of its own Data Adapter through the data_provider block. At least one connection must be provided.
Since Kafka Connector manages the physical connection to Kafka by wrapping an internal Kafka Consumer, several configuration settings in the Data Adapter are identical to those required by the usual Kafka Consumer configuration.
Optional. The name
attribute of the data_provider
tag defines Kafka Connection Name, which will be used by the Clients to request real-time data from this specific Kafka connection through a Subscription object.
Furthermore, the name is also used to group all logging messages belonging to the same connection.
Tip
For every Data Adaper connection, add a new logger and its relative file appender to log4j.properties
, so that you can log to dedicated files all the interactions pertinent to the connection with the Kafka cluster and the message retrieval operations, along with their routing to the subscribed items.
For example, the factory logging configuration provides the logger QuickStart
to print every log messages relative to the QuickStart
connection:
...
# QuickStart logger
log4j.appender.QuickStartFile=org.apache.log4j.RollingFileAppender
log4j.appender.QuickStartFile.layout=org.apache.log4j.PatternLayout
log4j.appender.QuickStartFile.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n
log4j.appender.QuickStartFile.File=../../logs/quickstart.log
Example:
<data_provider name="BrokerConnection">
Default value: DEFAULT
, but only one DEFAULT
configuration is permitted.
Mandatory. The adapter_class
tag defines the Java class name of the Data Adapter. DO NOT EDIT IT!.
Factory value: com.lightstreamer.kafka_connector.adapters.KafkaConnectorAdapter
.
Optional. Enable this connection configuration. Can be one of the following:
true
false
If disabled, Lightstreamer Server will automatically deny every subscription made to this connection.
Default value: true
.
Example:
<param name="enable">false</param>
Mandatory. The Kafka Cluster bootstrap server endpoint expressed as the list of host/port pairs used to establish the initial connection.
The parameter sets the value of the bootstrap.servers
key to configure the internal Kafka Consumer.
Example:
<param name="bootstrap.servers">broker:29092,broker:29093</param>
Optional. The name of the consumer group this connection belongs to.
The parameter sets the value for the group.id
key to configure the internal Kafka Consumer.
Default value: Kafka Connector Identifier + Connection Name + Randomly generated suffix.
<param name="group.id">kafka-connector-group</param>
A TCP secure connection to Kafka is configured through parameters with the prefix encryption
.
Optional. Enable encryption of this connection. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="encryption.enable">true</param>
Optional. The SSL protocol to be used. Can be one of the following:
TLSv1.2
TLSv1.3
Default value: TLSv1.3
when running on Java 11 or newer, TLSv1.2
otherwise.
Example:
<param name="encryption.protocol">TLSv1.2</param>
Optional. The list of enabled secure communication protocols.
Default value: TLSv1.2,TLSv1.3
when running on Java 11 or newer, TLSv1.2
otherwise.
Example:
<param name="encryption.enabled.protocols">TLSv1.3</param>
Optional. The list of enabled secure cipher suites.
Default value: all the available cipher suites in the running JVM.
Example:
<param name="encryption.cipher.suites">TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA</param>
Optional. Enable hostname verification. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="encryption.hostname.verification.enable">true</param>
Optional. The path of the trust store file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
Example:
<param name="encryption.truststore.path">secrets/kafka-connector.truststore.jks</param>
Optional. The password of the trust store.
If not set, checking the integrity of the trust store file configured will not be possible.
Example:
<param name="encryption.truststore.password">kafka-connector-truststore-password</param>
Optional. Enable a key store. Can be one of the following:
true
false
A key store is required if the mutual TLS is enabled on Kafka.
If enabled, the following parameters configure the key store settings:
encryption.keystore.path
encryption.keystore.password
encryption.keystore.key.password
Default value: false
.
Example:
<param name="encryption.keystore.enable">true</param>
Mandatory if key store is enabled. The path of the key store file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
Example:
<param name="encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
Optional. The password of the key store.
If not set, checking the integrity of the key store file configured will not be possible.
Example:
<param name="encryption.keystore.password">keystore-password</param>
Optional. The password of the private key in the key store file.
Example:
<param name="encryption.keystore.key.password">kafka-connector-private-key-password</param>
Check out the adapters.xml file of the Quick Start SSL app, where you can find an example of encryption configuration.
Broker authentication is configured through parameters with the prefix authentication
.
Optional. Enable the authentication of this connection against the Kafka Cluster. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="authentication.enable">true</param>
Mandatory if authentication is enabled. The SASL mechanism type. Kafka Connector accepts the following authentication mechanisms:
PLAIN
(the default value)SCRAM-256
SCRAM-512
GSSAPI
In the case of PLAIN
, SCRAM-256
, and SCRAM-512
mechanisms, the credentials must be configured through the following mandatory parameters (which are not allowed for GSSAPI
):
authentication.username
, the username.authentication.password
, the password.
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">PLAIN</param>
<param name="authentication.username">authorized-kafka-user</param>
<param name="authentication.password">authorized-kafka-user-password</param>
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">SCRAM-256</param>
<param name="authentication.username">authorized-kafka-usee</param>
<param name="authentication.password">authorized-kafka-user-password</param>
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">SCRAM-512</param>
<param name="authentication.username">authorized-kafka-username</param>
<param name="authentication.password">authorized-kafka-username-password</param>
In the case of GSSAPI
authentication mechanism, the following parameters will be part of the authentication configuration:
-
authentication.gssapi.key.tab.enable
Optional. Enable the use of a keytab. Can be one of the following:
true
false
Default value:
false
. -
authentication.gssapi.key.tab.path
Mandatory if keytab is enabled. The path to the kaytab file, relative to the deployment folder (
LS_HOME/adapters/lightstreamer-kafka-connector-<version>
). -
authentication.gssapi.store.key.enable
Optional. Enable storage of the principal key. Can be one of the following:
true
false
Default value:
false
.Default value:
false
. -
authentication.gssapi.kerberos.service.name
Mandatory. The name of the Kerberos service.
-
authentication.gssapi.principal
Mandatory if ticket cache is disabled. The name of the principal to be used.
-
authentication.gssapi.ticket.cache.enable
Optional. Enable the use of a ticket cache. Can be one of the following:
true
false
Default value:
false
.
Example:
...
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">GSSAPI</param>
<param name="authentication.gssapi.key.tab.enable">true</param>
<param name="authentication.gssapi.key.tab.path">gssapi/kafka-connector.keytab</param>
<param name="authentication.gssapi.store.key.enable">true</param>
<param name="authentication.gssapi.kerberos.service.name">kafka</param>
<param name="authentication.gssapi.principal">[email protected]</param>
...
Example of configuration with the use of a ticket cache:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">GSSAPI</param>
<param name="authentication.gssapi.kerberos.service.name">kafka</param>
<param name="authentication.gssapi.ticket.cache.enable">true</param>
Check out the adapters.xml file of the Quick Start Confluent Cloud app, where you can find an example of authentication configuration.
Kafka Connector can deserialize Kafka records from the following formats:
- Apache Avro
- JSON
- String
- Integer
- Float
and other scalar types (see the complete list).
In particular, Kafka Connector supports message validation for Avro and JSON, which can be specified through:
- local schema files
- the Confluent Schema Registry
Kafka Connector allows independent deserialization of keys and values, which means that:
- key and value can have different formats
- message validation against the Confluent Schema Registry can be enabled separately for the Kafka key and Kafka value (through
record.key.evaluator.schema.registry.enable
andrecord.value.evaluator.schema.registry.enable
) - message validation against local schema files must be specified separately for key and value (through
record.key.evaluator.schema.path
andrecord.value.evaluator.schema.path
)
Important
For Avro, schema validation is mandatory, therefore either a local schema file must be provided or the Confluent Schema Registry must be enabled.
Optional. Specifies where to start consuming events from:
LATEST
: start consuming events from the end of the topic partitionEARLIEST
: start consuming events from the beginning of the topic partition
The parameter sets the value of the auto.offset.reset
key to configure the internal Kafka Consumer.
Default value: LATEST
.
Example:
<param name="record.consume.from">EARLIEST</param>
Optional. The format to be used to deserialize respectively the key and value of a Kafka record. Can be one of the following:
AVRO
JSON
STRING
INTEGER
BOOLEAN
BYTE_ARRAY
BYTE_BUFFER
BYTES
DOUBLE
FLOAT
LONG
SHORT
UUID
Default value: STRING
.
Examples:
<param name="record.key.evaluator.type">INTEGER</param>
<param name="record.value.evaluator.type">JSON</param>
Mandatory if evaluator type is AVRO
and the Confluent Schema Registry is disabled. The path of the local schema file relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
) for message validation respectively of the Kafka key and the Kafka value.
Examples:
<param name="record.key.evaluator.schema.path">schema/record_key.avsc</param>
<param name="record.value.evaluator.schema.path">schemas/record_value.avsc</param>
Mandatory if evaluator type is AVRO
and no local schema paths are specified. Enable the use of the Confluent Schema Registry for validation respectively of the Kafka key and the Kafka value. Can be one of the following:
true
false
Default value: false
.
Examples:
<param name="record.key.evaluator.schema.registry.enable">true</param>
<param name="record.value.evaluator.schema.registry.enable">true</param>
Optional. The error handling strategy to be used if an error occurs while extracting data from incoming deserialized records. Can be one of the following:
IGNORE_AND_CONTINUE
: ignore the error and continue to process the next recordFORCE_UNSUBSCRIPTION
: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection
Default value: IGNORE_AND_CONTINUE
.
Example:
<param name="record.extraction.error.strategy">FORCE_UNSUBSCRIPTION</param>
Kafka Connector allows the configuration of several routing and mapping strategies, thus enabling the convey of Kafka events streams to a potentially huge amount of devices connected to Lightstreamer with great flexibility.
As anticipated in the Installation section, a Kafka record can be analyzed in all its aspects to extract information that can be:
- routed to the designated Lightstreamer items
- remapped to specific Lightstreamer fields
To configure the routing of Kafka event streams to Lightstreamer items, use at least one parameter map.<topic>.to
. The general format is:
<param name="map.<topic-name>.to">item1,item2,itemN,...</param>
which defines the mapping between the source Kafka topic (<topic-name>
) and the target items (item1
, item2
, itemN
, etc.).
This configuration enables the implementation of various mapping scenarios, as shown by the following examples:
-
One To One
<param name="map.sample-topic.to">sample-item</param>
This is the most straightforward scenario one may think of: every record published to the Kafka topic
sample-topic
will simply be routed to the Lightstreamer itemsample-item
. Therefore, messages will be immediately broadcasted as real-time updates to all clients subscribed to such an item. -
One To Many
<param name="map.sample-topic.to">sample-item1,sample-item2,sample-item3</param>
Every record published to the Kafka topic
sample-topic
will be routed to the Lightstreamer itemssample-item1
,sample-item2
, andsample-item3
.This scenario may activate unicast and multicast messaging, as it is possible to specify which item can be subscribed to by which user or group of users. To do that, it is required to provide a customized extension of the factory Metadata Adapter class (see the example), in which every subscription must be validated against the user identity.
-
Many to One
<param name="map.sample-topic1.to">sample-item</param> <param name="map.sample-topic2.to">sample-item</param> <param name="map.sample-topic3.to">sample-item</param>
With this scenario, it is possible to broadcast to all clients subscribed to a single item (
sample-item
) every message published to different topics (sample-topic1
,sample-topic2
,sample-topic3
).
To forward real-time updates to the Lightstreamer clients, a Kafka record must be mapped to Lightstreamer fields, which define the schema of any Lightstreamer item.
To configure the mapping, you define the set of all subscribeable fields through parameters with the prefix field.
:
<param name="field.fieldName1">extraction_expression1</param>
<param name="field.fieldName2">extraction_expression2<param>
...
<param name="field.fieldNameN">extraction_expressionN<param>
...
The configuration specifies that the field fieldNameX
will contain the value extracted from the deserialized Kafka record through the extraction_expressionX
. This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer.
To write an extraction expression, Kafka Connector provides the Data Extraction Language. This language has a pretty minimal syntax, with the following basic rules:
-
expressions must be enclosed within
#{...}
-
expressions use Extraction Keys, a set of predefined constants that reference specific parts of the record structure:
#{KEY}
: the key#{VALUE}
: the value#{TOPIC}
: the topic#{TIMESTAMP}
: the timestamp#{PARTITION}
: the partition#{OFFSET}
: the offset
-
the dot notation is used to access attributes or fields of record keys and record values serialized in JSON or Avro formats:
KEY.attribute1Name.attribute2Name... VALUE.attribute1Name.attribute2Name...
Important
Currently, it is required that the top-level element of either a record key or record value is:
Such a constraint may be removed in a further version of Kafka Connector.
-
the square notation is used to access:
-
indexed attributes:
KEY.attribute1Name[i].attribute2Name... VALUE.attribute1Name[i].attribute2Name...
where
i
is a 0-indexed value. -
key-based attributes:
KEY.attribute1Name['keyName'].attribute2Name... VALUE.attribute1Name['keyName'].attribute2Name...
where
keyName
is a string value.
-
Tip
For JSON format, accessing a child attribute by dot notation or square bracket notation is equivalent:
VALUE.myProperty.myChild.childProperty
VALUE.myProperty['myChild'].childProperty
- expressions must evaluate to a scalar value, otherwise an error will be thrown during the extraction process. The error will be handled as per the configured strategy.
The QuckStart factory configuration shows a basic example, where a simple direct mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the same name. Of course, thanks to the Data Extraction Language, more complex mapping can be employed.
...
<param name="field.timestamp">#{VALUE.timestamp}</param>
<param name="field.time">#{VALUE.time}</param>
<param name="field.stock_name">#{VALUE.name}</param>
<param name="field.last_price">#{VALUE.last_price}</param>
<param name="field.ask">#{VALUE.ask}</param>
<param name="field.ask_quantity">#{VALUE.ask_quantity}</param>
<param name="field.bid">#{VALUE.bid}</param>
<param name="field.bid_quantity">#{VALUE.bid_quantity}</param>
<param name="field.pct_change">#{VALUE.pct_change}</param>
<param name="field.min">#{VALUE.min}</param>
<param name="field.max">#{VALUE.max}</param>
<param name="field.ref_price">#{VALUE.ref_price}</param>
<param name="field.open_price">#{VALUE.open_price}</param>
<param name="field.item_status">#{VALUE.item_status}</param>
..
Besides mapping topics to statically predefined items, Kafka Connector allows you to configure the item templates,
which specify the rule needed to decide if a message can be forwarded to the clients, thus enabling a filtered routing.
The item template leverages the Data Extraction Language to extract data from Kafka records and match them against the parameterized subscribed items.
To configure an item template, use the parameter item-template.<template-name>
:
<param name="item-template.<template-name>"><item-prefix>-<bindable_expressions></param>
and then configure the routing by referencing the template through the parameter map.<topic>.to
:
<param name="map.<topic>.to">item-template.<template-name></param>
Tip
It is allowed to mix references to simple item names and item templates in the same topic mapping configuration:
<param name="map.sample-topic.to">item-template.template1,item1,item2</param>
The item template is made of:
-
<prefix>
: the prefix of the item name -
<bindable_expressions>
: a sequence of bindable extraction expressions, which define filtering rule specified as:#{paramName1=<extraction_expression_1>,paramName2=<extraction_expression_2>,...}
where the value extracted from the deserialized Kafka record through
<extraction_expression_X>
(written using the Data Extraction Language) will be boundparaNameX
.
To activate filtered routing, the Lightstreamer clients subscribe to a parameterized item, expressed in the form:
<item-prefix>-[paramName1=value2,paramName2=value2,...]
Upon consuming a message, Kafka Connector expands every item template relative to the record topic by evaluating the extraction expressions. The expanded template will result as:
<item-prefix>-[paramName1=extractedValue_1,paramName2=extractedValue_2,...]
Finally, the message will be mapped and routed only in case the subscribed item completely matches the expanded template.
Consider the following configuration:
<param name=item-template.by-name>user-#{firstName=VALUE.name,lastName=VALUE.surname}</param>
<param name=item-template.by-age>user-#{years=VALUE.age}</param>
<param name="map.user.to">item-template.by-name,item-template.by-age</param>
which specifies how to route records published from the topic user
to item templates defined to extract some personal data.
Let's suppose we have three different Lightstreamer clients:
- Client A subscribes to:
- the parameterized item SA1
user-[firstName=James,lastName=Kirk]
for receiving real-time updates relative to the userJames Kirk
- the parameterized item SA2
user-[age=45]
for receiving real-time updates relative to any 45 year-old user
- the parameterized item SA1
- Client B subscribes to:
- the parameterized item SB1
user-[firstName=Montgomery,lastName=Scotty]
for receiving real-time updates relative to the userMontgomery Scotty
- the parameterized item SB1
- Client C subscribes to the parameterized item SC1
user-[age=37]
for receiving real-time updates relative to any 37 year-old user.
Now, let's see how filtered routing works for the following incoming Kafka records published to the topic user
:
-
Record 1:
{ ... "name": "James", "surname": "Kirk", "age": 37, ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=James,lastName=Kirk]
SA1 Client A by-age
user-[age=37]
SC1 Client C -
Record 2:
{ ... "name": "Montgomery", "surname": "Scotty", "age": 45 ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=Montgomery,lastName=Scotty]
SB1 Client B by-age
user-[age=45]
SA2 Client A -
Record 3:
{ ... "name": "Nyota", "surname": "Uhura", "age": 37, ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=Nyota,lastName=Uhura]
None None by-age
user-[age=37]
SC1 Client C
A Schema Registry is a centralized repository that manages and validates schemas, which define the structure of valid messages.
Kafka Connector supports integration with the Confluent Schema Registry through the configuration of parameters with the prefix schema.registry
.
Mandatory if the Confluent Schema Registry is enabled. The URL of the Confluent Schema Registry.
Example:
<param name="schema.registry.url">http//localhost:8081</param>
An encrypted connection is enabled by specifying the https
protocol (see the next section).
Example:
<param name="schema.registry.url">https://localhost:8084</param>
A secure connection to the Confluent Schema Registry can be configured through parameters with the prefix schema.registry.encryption
, each one having the same meaning as the homologous parameters defined in the Encryption Parameters section:
schema.registry.encryption.protocol
(see encryption.protocol)schema.registry.encryption.enabled.protocols
(see encryption.enabled.protocols)schema.registry.encryption.cipher.suites
(see encryption.cipher.suites)schema.registry.encryption.truststore.path
(see encryption.truststore.path)schema.registry.encryption.truststore.password
(see encryption.truststore.password)schema.registry.encryption.hostname.verification.enable
(see encryption.hostname.verification.enable)schema.registry.encryption.keystore.enable
(see encryption.keystore.enable)schema.registry.encryption.keystore.path
(see encryption.keystore.path)schema.registry.encryption.keystore.password
(see encryption.keystore.password)schema.registry.encryption.keystore.key.password
(see encryption.keystore.key.password)
Example:
<!-- Set the Confluent Schema Registry URL. The https protcol enable encryption parameters -->
<param name="schema.registry.url">https//localhost:8084</param>
<!-- Set general encryption settings -->
<param name="schema.registry.encryption.enabled.protocols">TLSv1.3</param>
<param name="schema.registry.encryption.cipher.suites">TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA</param>
<param name="schema.registry.encryption.hostname.verification.enable">true</param>
<!-- If required, configure the trust store to trust the Confluent Schema Registry certificates -->
<param name="schema.registry.encryption.truststore.path">secrets/secrets/kafka.connector.schema.registry.truststore.jks</param></param>
<!-- If mutual TLS is enabled on the Confluent Schema Registry, enable and configure the key store -->
<param name="schema.registry.encryption.keystore.enable">true</param>
<param name="schema.registry.encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
<param name="schema.registry.encryption.keystore.password">kafka-connector-password</param>
<param name="schema.registry.encryption.keystore.key.password">schemaregistry-private-key-password</param>
Check out the adapters.xml file of the Quick Start Schema Registry app, where you can find an example of Schema Registry settings.
If you have any specific need to customize the Kafka Connector Metadata Adapter class, you can provide your implementation by extending the factory class com.lightstreamer.kafka_connector.adapters.pub.KafkaConnectorMetadataAdapter
. The class provides the following hook methods, which you can override to add your custom logic:
-
postInit: invoked after the initialization phase of the Kafka Connector Metadata Adapter has been completed
-
onSubscription: invoked to notify that a user has submitted a subscription
-
onUnsubcription: invoked to notify that a Subscription has been removed
To develop your extension, you need the Kafka Connector jar library, which is hosted on Github Packages.
For a Maven project, add the dependency to your pom.xml file:
<dependency>
<groupId>com.lightstreamer.kafka-connector</groupId>
<artifactId>kafka-connector</artifactId>
<version>0.1.0</version>
</dependency>
and follow these instructions to configure the repository and authentication.
For a Gradle project, edit your build.gradle file as follows:
-
add the dependency:
dependencies { implementation group: 'com.lightstreamer.kafka_connector', name: 'kafka-connector', 'version': '0.1.0' }
-
add the repository and specify your personal access token:
repositories { mavenCentral() maven { name = "GitHubPackages" url = uri("https://maven.pkg.github.com/lightstreamer/lightstreamer-kafka-connector") credentials { username = project.findProperty("gpr.user") ?: System.getenv("USERNAME") password = project.findProperty("gpr.key") ?: System.getenv("TOKEN") } } }
In the examples/custom-kafka-connector-adapter folder, you can find a sample Gradle project you may use as a starting point to build and deploy your custom extension.
The docs folder contains the complete Kafka Connector API Specification, already mentioned in the previous section.
The examples folder contains all the examples mentioned throughout this guide. Furthermore, you may take a look at the Airport Demo, which provides more insights into various usage and configuration options of Kafka Connector.