Skip to content

Latest commit

 

History

History
215 lines (158 loc) · 12.8 KB

File metadata and controls

215 lines (158 loc) · 12.8 KB

Kafka Connector Airport Demo

This project includes the resources needed to develop the Kafka Connector Airport Demo.

Infrastructure

The Demo simulates a basic departures board consisting of ten rows, each representing flight departure information from a hypothetical airport. The simulated data, inputted into a Kafka cluster, is fetched and injected into the Lightstreamer server via Kafka Connector.

The demo project consists of:

  • a web client designed to visualize the airport departure board from a browser
  • a random flight information generator that acts as a message producer for Kafka
  • files to configure Kafka Connector according to the needs of the demo

The Web Client

The web client, contained in the folder client/web uses the Web Client SDK API for Lightstreamer to handle the communications with Lightstreamer Server. A simple user interface is implemented to display the real-time data received from Lightstreamer Server.

Demo ScreenShot

The demo basically executes a single Subscription with ten items subscribed to in MERGE mode feeding a DynaGrid with the current list and status of the next departing flights (according to the simulated time). The list of the ten Items to subscribe to is as follows:

itemsList = ["flights-[key=10]", "flights-[key=1]", "flights-[key=2]", "flights-[key=3]", "flights-[key=4]", "flights-[key=5]", "flights-[key=6]", "flights-[key=7]", "flights-[key=8]", "flights-[key=9]" ];

each representing a row on the board. The table is then kept sorted by departure time by setting the setSort call of the DynaGrid object.

As you can see, items have been expressed in a parameterized format to activate the filter routing as per the item template defined as follows:

<param name="item-template.flights">flights-#{key=KEY}</param>

which requires every subscription to include a filtering value for the bind parameter key. Upon consuming an incoming message, Kafka Connector will then route the record if the subscribed item has specified a filtering value that matches the record key.

The Producer

The source code of the producer is basically contained in the producer package, which generates random information for the flights and acts as the producer versus the Kafka cluster. In particular, the following classes are defined:

  • DemoPublisher.java: implementing the simulator generating and sending flight monitor data to a Kafka topic; the messages sent to Kafka will also have a key composed simply of a number representing the row in the table to which the information refers
  • FlightInfo.java: class that defines all the flight-related information to be displayed on the departure board, and will be serialized into JSON format as a Kafka message

Connector Configurations

In the connector folder, we found the configuration files needed to configure Kafka Connector:

  • adapters.xml: in this file, parameters are essentially configured for the connector to consume messages from Kafka, and the mapping between Kafka cluster topics and Lightstreamer items that the client will subscribe to is defined. In the specific case of this demo, message serialization occurs via JSON objects, and therefore, the mapping of fields from the received JSON object to the Lightstreamer item fields to be sent to clients is also defined. In particular, the section defining the field mapping is this one:
      <data_provider name="AirpotDemo">
        ...
    
        <!-- Extraction of the record key mapped to the field "key". -->
        <param name="field.key">#{KEY}</param>
    
        <!-- Extraction of the record value attributes mapped to corresponding field names. -->
        <param name="field.destination">#{VALUE.destination}</param>
        <param name="field.departure">#{VALUE.departure}</param>
        <param name="field.flightNo">#{VALUE.flightNo}</param>
        <param name="field.terminal">#{VALUE.terminal}</param>
        <param name="field.status">#{VALUE.status}</param>
        <param name="field.airline">#{VALUE.airline}</param>
        <param name="field.currentTime">#{VALUE.currentTime}</param>
    
        ...
      </data_provider>
  • log4j.properties: in this file, you'll find the specific configuration for the Kafka Connector log, to obtain details about all interactions with the Kafka cluster and the message retrieval operations, along with their routing to the subscribed items in the Lightstreamer server. In this demo, a specific log file named airport.log is configured, destined for the same logs folder as the other Lightstreamer logs.

Setting up the Demo

Kafka Cluster

The demo needs a Kafka cluster where a topic Flights is created. You can use either a locally installed instance of Kafka in your environment, starting perhaps from the latest release of Apache Kafka as explained here, or an installation of Confluent Platform (you can find a quickstart here). Alternatively, you can use one of the cloud services that offer fully managed services such as Confluent Cloud or AWS MSK. Based on this choice, you will need to modify the adapters.xml files accordingly, particularly the bootstrap server parameter. The proposed configuration assumes a local Kafka installation that does not require authentication or the use of TLS communication:

<data_provider name="AirpotDemo">
    <!-- ##### GENERAL PARAMETERS ##### -->

    <adapter_class>com.lightstreamer.kafka_connector.adapters.KafkaConnectorDataAdapter</adapter_class>

    <!-- The Kafka cluster address -->
    <param name="bootstrap.servers">localhost:9092</param>

  ...

</data_provider>

However, in more complex scenarios where authentication and TLS need to be set up, please refer to the Kafka Connector guide here and here.

The demo leverages a particular data retention mechanism to ensure simplified snapshot management. The mechanism is compaction, which takes advantage of the fact that the demo uses key-based messages, allowing the Kafka cluster to maintain only one value per key, the most recent one, in the message history. Further details on this mechanism can be found here.

To configure our Flights topic to be managed in a compacted manner, the following steps are necessary:

  1. set up the Kafka cluster to support this mode, ensuring that the server.properties file contains this setting:

    log.cleanup.policy=compact, delete
  2. create the topic with the following configurations:

    $ ./bin/kafka-topics.sh --create --topic Flights --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --config cleanup.policy=compact
    $ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name Flights --describe

Lightstreamer Server

  • Download Lightstreamer Server version 7.4.2 or later (Lightstreamer Server comes with a free non-expiring demo license for 20 connected users) from Lightstreamer Download page, and install it, as explained in the GETTING_STARTED.TXT file in the installation home directory.
  • Make sure that Lightstreamer Server is not running.
  • Deploy a fresh installation of Lightstreamer Kafka Connector following the instructions provided here.
  • Replace the adapters.xml file with the one of this project and in the case update the settings as discussed in the previous section.
  • [Optional] Customize the logging settings in the log4j configuration file log4j.properties.
  • Launch Lightstreamer Server.

Simulator Producer loop

To build the simulator you have two options: either use Gradle (or other build tools) to take care of dependencies and build (recommended) or gather the necessary jars yourself and build it manually. For the sake of simplicity, only the Gradle case is detailed here.

Gradle

You can easily build the producer by running the following command from the producer folder:

$ ./gradlew clean build

which generates the uber jar. Then, you can start the simulator producer loop with this command:

$ java -jar build/libs/example-kafka-connector-demo-publisher-all-1.0.0.jar localhost:9092 Flights 1000

where:

  • localhost:9092 is the bootstrap string for connecting to Kafka and for which the same considerations made above apply
  • Flights is the topic name used to produce the messages with simulated flights info
  • 1000 is the interval in milliseconds between the generation of one simulated event and the next

Web Client

In order to install a web client for this demo pointing to your local Lightstreamer Server, follow these steps:

  • deploy this demo on the Lightstreamer Server (used as Web server) or in any external Web Server. If you choose the former, create the folders <LS_HOME>/pages/demos/airport70 (you can customize the last two digits based on your favorite movie in the series) and copy here the contents of the client/web/src folder of this project

Important

The client demo configuration assumes that Lightstreamer Server, Kafka Cluster, and this client are launched on the same machine. If you need to target a different Lightstreamer server, please double check the LS_HOST variable in client/web/src/js/const.js and change it accordingly.

Setting Up on Docker Compose

To simplify the setup, we have also provided two different Docker Compose files to showcase the demo against Apache Kakfa and Redpanda Self-Hosted:

Prerequisites

  • JDK version 17 or later.
  • Docker Compose

Run

  1. From the examples/airport-demo folder:

    • for running the demo against Apache Kafka:

      $ ./start_demo.sh
      ...
       ✔ Network airport-demo-kafka_default  Created
       ✔ Container broker                    Started
       ✔ Container init-broker               Started
       ✔ Container producer                  Started
       ✔ Container kafka-connector           Started
      Services started. Now you can point your browser to http://localhost:8080/AirportDemo to see real-time data.
    • for running the demo against Redpanda Self-Hosted:

      $ ./start_demo_redpanda.sh
      ...
       ✔ Network airport-demo-redpanda_default  Created
       ✔ Container redpanda                     Started
       ✔ Container redpanda-console             Started
       ✔ Container producer                     Started
       ✔ Container kafka-connector              Started
      Services started. Now you can point your browser to http://localhost:8080/AirportDemo to see real-time data.
  2. Once all containers are ready, point your browser to http://localhost:8080/AirportDemo.

  3. After a few moments, the user interface starts displaying the real-time flights data.

  4. To shutdown Docker Compose and clean up all temporary resources:

    • for Apache Kafka, execute:

      $ ./stop_demo.sh
       ✔ Container kafka-connector           Removed
       ✔ Container init-broker               Removed
       ✔ Container producer                  Removed
       ✔ Container broker                    Removed
       ✔ Network airport-demo-kafka_default  Removed
    • for Redpanda Self-Hosted, execute:

      $ ./stop_demo_redpanda.sh
      ...
       ✔ Container redpanda-console             Removed
       ✔ Container kafka-connector              Removed
       ✔ Container producer                     Removed
       ✔ Container redpanda                     Removed
       ✔ Network airport-demo-redpanda_default  Removed