This project includes the resources needed to develop the Kafka Connector Airport Demo.
The Demo simulates a basic departures board consisting of ten rows, each representing flight departure information from a hypothetical airport. The simulated data, inputted into a Kafka cluster, is fetched and injected into the Lightstreamer server via Kafka Connector.
The demo project consists of:
- A web client designed to visualize the airport departure board from a browser.
- A random flight information generator that acts as a message producer for Kafka.
- Files to configure Kafka Connector according to the needs of the demo.
The web client, contained in the folder client/web
uses the Web Client SDK API for Lightstreamer to handle the communications with Lightstreamer Server. A simple user interface is implemented to display the real-time data received from Lightstreamer Server.
The demo basically executes a single Subscription with ten items subscribed to in MERGE mode feeding a DynaGrid with the current list and status of the next departing flights (according to the simulated time). The list of the ten Items to subscribe to is as follows:
itemsList = ["flights-[key=10]", "flights-[key=1]", "flights-[key=2]", "flights-[key=3]", "flights-[key=4]", "flights-[key=5]", "flights-[key=6]", "flights-[key=7]", "flights-[key=8]", "flights-[key=9]" ];
each representing a row on the board. The table is then kept sorted by departure time by setting the setSort call of the DynaGrid object.
As you can see, items have been expressed in a parameterized format to activate the filter routing as per the item template defined as follows:
<param name="item-template.flights">flights-#{key=KEY}</param>
which requires every subscription to include a filtering value for the bind parameter key
.
Upon consuming an incoming message, Kafka Connector will then route the record if the subscribed item has specified a filtering value that matches the record key.
The source code of the producer is basically contained in the producer
package, which generates random information for the flights and acts as the producer versus the Kafka cluster. In particular, the following classes are defined:
DemoPublisher.java
: class that implements the simulator generating and sending flight monitor data to a Kafka topic; the messages sent to Kafka will also have a key composed simply of a number representing the row in the table to which the information refers.FlightInfo.java
: class that defines all the flight-related information to be displayed on the departure board, and will be serialized into JSON format as a Kafka message.
In the connector
folder, we found the configuration files needed to configure Kafka Connector:
adapters.xml
: in this file, parameters are essentially configured for the connector to consume messages from Kafka, and the mapping between Kafka cluster topics and Lightstreamer items that the client will subscribe to is defined. In the specific case of this demo, message serialization occurs via JSON objects, and therefore, the mapping of fields from the received JSON object to the Lightstreamer item fields to be sent to clients is also defined. In particular, the section defining the field mapping is this one:<data_provider name="AirpotDemo"> ... <!-- Extraction of the record key mapped to the field "key". --> <param name="field.key">#{KEY}</param> <!-- Extraction of the record value attributes mapped to corresponding field names. --> <param name="field.destination">#{VALUE.destination}</param> <param name="field.departure">#{VALUE.departure}</param> <param name="field.flightNo">#{VALUE.flightNo}</param> <param name="field.terminal">#{VALUE.terminal}</param> <param name="field.status">#{VALUE.status}</param> <param name="field.airline">#{VALUE.airline}</param> <param name="field.currentTime">#{VALUE.currentTime}</param> ... </data_provider>
log4j.properties
: in this file, you'll find the specific configuration for the Kafka Connector log, to obtain details about all interactions with the Kafka cluster and the message retrieval operations, along with their routing to the subscribed items in the Lightstreamer server. In this demo, a specific log file namedairport.log
is configured, destined for the samelogs
folder as the other Lightstreamer logs.
The demo needs a Kafka cluster where a topic Flights
is created. You can use either a locally installed instance of Kafka in your environment, starting perhaps from the latest release of Apache Kafka as explained here, or an installation of Confluent Platform (you can find a quickstart here). Alternatively, you can use one of the cloud services that offer fully managed services such as Confluent Cloud or AWS MSK.
Based on this choice, you will need to modify the adapters.xml
files accordingly, particularly the bootstrap server
parameter. The proposed configuration assumes a local Kafka installation that does not require authentication or the use of TLS communication:
<data_provider name="AirpotDemo">
<!-- ##### GENERAL PARAMETERS ##### -->
<adapter_class>com.lightstreamer.kafka.adapters.KafkaConnectorDataAdapter</adapter_class>
<!-- The Kafka cluster address -->
<param name="bootstrap.servers">localhost:9092</param>
...
</data_provider>
However, in more complex scenarios where authentication and TLS need to be set up, please refer to the Kafka Connector guide here and here.
The demo leverages a particular data retention mechanism to ensure simplified snapshot management. The mechanism is compaction, which takes advantage of the fact that the demo uses key-based messages, allowing the Kafka cluster to maintain only one value per key, the most recent one, in the message history. Further details on this mechanism can be found here.
To configure our Flights
topic to be managed in a compacted manner, the following steps are necessary:
-
Set up the Kafka cluster to support this mode, ensuring that the
server.properties
file contains this setting:log.cleanup.policy=compact, delete
-
Create the topic with the following configurations:
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic Flights \ --replication-factor 1 \ --partitions 1 \ --config cleanup.policy=compact \ --config segment.ms=30000 $ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name Flights --describe
- Download Lightstreamer Server version 7.4.2 or later (Lightstreamer Server comes with a free non-expiring demo license for 20 connected users) from Lightstreamer Download page, and install it, as explained in the
GETTING_STARTED.TXT
file in the installation home directory. - Make sure that Lightstreamer Server is not running.
- Deploy a fresh installation of Lightstreamer Kafka Connector following the instructions provided here.
- Replace the
adapters.xml
file with the one of this project and in the case update the settings as discussed in the previous section. - [Optional] Customize the logging settings in the log4j configuration file
log4j.properties
. - Launch Lightstreamer Server.
To build the simulator you have two options: either use Gradle (or other build tools) to take care of dependencies and build (recommended) or gather the necessary jars yourself and build it manually. For the sake of simplicity, only the Gradle case is detailed here.
You can easily build the producer by running the following command from the producer
folder:
$ ./gradlew clean build
which generates the uber jar. Then, you can start the simulator producer loop with this command:
$ java -jar build/libs/example-kafka-connector-demo-publisher-all-1.0.0.jar localhost:9092 Flights 1000
where:
localhost:9092
is the bootstrap string for connecting to Kafka and for which the same considerations made above apply.Flights
is the topic name used to produce the messages with simulated flights info.1000
is the interval in milliseconds between the generation of one simulated event and the next.
In order to install a web client for this demo pointing to your local Lightstreamer Server, follow these steps:
-
Deploy this demo on the Lightstreamer Server (used as Web server) or in any external Web Server.
If you choose the former, create the folders
<LS_HOME>/pages/demos/airport70
(you can customize the last two digits based on your favorite movie in the series) and copy here the contents of theclient/web/src
folder of this project
Important
The client demo configuration assumes that Lightstreamer Server, Kafka Cluster, and this client are launched on the same machine. If you need to target a different Lightstreamer server, please double check the LS_HOST
variable in client/web/src/js/const.js
and change it accordingly.
- Open your browser and point it to http://localhost:8080/airport70.
To simplify the setup, we have also provided two different Docker Compose files to showcase the demo against Apache Kakfa and Redpanda Self-Hosted:
- JDK version 17 or later
- Docker Compose
-
From the
examples/airport-demo
folder:-
For running the demo against Apache Kafka:
$ ./start_demo.sh ... ✔ Network airport-demo-kafka_default Created ✔ Container broker Started ✔ Container init-broker Started ✔ Container producer Started ✔ Container kafka-connector Started Services started. Now you can point your browser to http://localhost:8080/AirportDemo to see real-time data.
-
For running the demo against Redpanda Self-Hosted:
$ ./start_demo_redpanda.sh ... ✔ Network airport-demo-redpanda_default Created ✔ Container redpanda Started ✔ Container redpanda-console Started ✔ Container producer Started ✔ Container kafka-connector Started Services started. Now you can point your browser to http://localhost:8080/AirportDemo to see real-time data.
-
-
Once all containers are ready, point your browser to http://localhost:8080/AirportDemo.
-
After a few moments, the user interface starts displaying the real-time flights data.
-
To shutdown Docker Compose and clean up all temporary resources:
-
For Apache Kafka, execute:
$ ./stop_demo.sh ✔ Container kafka-connector Removed ✔ Container init-broker Removed ✔ Container producer Removed ✔ Container broker Removed ✔ Network airport-demo-kafka_default Removed
-
For Redpanda Self-Hosted, execute:
$ ./stop_demo_redpanda.sh ... ✔ Container redpanda-console Removed ✔ Container kafka-connector Removed ✔ Container producer Removed ✔ Container redpanda Removed ✔ Network airport-demo-redpanda_default Removed
-