Hello all! This repository contains a set of short exercises to get familiar with Apache Kafka. You'll need to do couple of setup steps and then you can run examples of producers and consumers that I've preapared for you.
- You'll need an Apache Kafka cluster. Apache Kafka is an open source platform, so you can either set it up and run from its source code or use a fully managed option, for this experiments you can use a free trial of Aiven for Apache Kafka (Disclaimer for transparency - I work at Aiven 🙂). I'll be using the latter option.
- Clone this repository and install the dependencies from pom.xml.
- To connect to the remote Apache Kafka cluster we need to set up SSL configuration. Follow these steps to create keystore and truststore based on the Access Key, Access Certificate and CA Certificate.
- Copy .env.example, rename to .env and update it with information about the location of truststore and keystore files and their passwords. You'll have something similar to the content below (just don't use 'password' as password 😉):
server="name-of-my-server.aivencloud.com:port" ssl.truststore.location="../keys/client.truststore.jks" ssl.truststore.password="password" ssl.keystore.type="PKCS12" ssl.keystore.location="../keys/client.keystore.p12" ssl.keystore.password="password" ssl.key.password="password"
- In your cluster create a topic with the name customer-activity that contains 3 partitions, for example for Aiven's managed version you can use the UI and create a topic directly from the console.
Now you're ready for demo exercises. In these demos we'll focus on a single topic that contains events based on customer activity in an online shop.
In this demo we'll look at a simple producer. This producer will send messages to the Kafka cluster; and a simple consumer will read messages and print out their content.
- Open the Java file
Shopitopia.simpleProducer
- this is an example of a very simple producer. It generates a random message every second and sends it into the cluster. Run the methodmain
to start the producer. - If the configuration is set up correctly, you'll see output similar to this:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441999 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1648141938679 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 7iPfsgbgGAqgwQ5XsIL9ng [main] INFO Shopitopia.simpleProducer - Sent: {"product":"Dress 👗","operation":"searched 🔍","customer":"Chief Bogo🐃"} [main] INFO Shopitopia.simpleProducer - Sent: {"product":"Tie 👔","operation":"searched 🔍","customer":"Officer Clawhauser😼"} [main] INFO Shopitopia.simpleProducer - Sent: {"product":"Tie 👔","operation":"bought ✅","customer":"Fru Fru💐"} [main] INFO Shopitopia.simpleProducer - Sent: {"product":"Donut 🍩 ","operation":"searched 🔍","customer":"Mr. Big 🪑"} [main] INFO Shopitopia.simpleProducer - Sent: {"product":"Donut 🍩 ","operation":"searched 🔍","customer":"Nick Wilde🦊"}
- While producer creates new messages, open Java file
Shopitopia.simpleConsumer
and run its methodmain
to start the consumer. Consumer will connect to the cluster and read messages added by producer. You will see detailed information about connection to the cluster and once the connection is established the received messages:
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-first-1, groupId=first] Resetting offset for partition customer-activity-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[35.228.93.149:12693 (id: 29 rack: null)], epoch=0}}. [main] INFO Shopitopia.simpleConsumer - message {"product":"Carrot 🥕","operation":"bought ✅","customer":"Mr. Big 🪑"} [main] INFO Shopitopia.simpleConsumer - message {"product":"Dress 👗","operation":"bought ✅","customer":"Mr. Big 🪑"}
- Observe the results. Once you're done, terminate the consumer
Shopitopia.simpleConsumer
, but leave the producer, we'll use it in the next demo.
In this demo we'll look at partitions and offsets.
- You should have the producer
Shopitopia.simpleProducer
already running. - Start
Shopitopia.consPartitions
, this is a consumer, where in addition to the message body, we output information about partitions and offsets for every message. - Also, try out the consumer
Shopitopia.consFiltered
which outputs results for a single customer. You can see that currently the messages that are related to a single customer are spread across all partitions. - Terminate the producers and consumers that are running.
When looking at the consumer output you can see that messages are spread across partitions in some random way. It is important to understand that Apache Kafka guarantees order only within a partition. This means that if we want to preserve message orders coming from our customers we need to write all messages related to a single customer into the same partition. This can be done by assigning keys to the messages. All messages with the same key will be added to the same partition.
- Run
Shopitopia.prodKeys
, this producer uses customer name as key for a message. - Run
Shopitopia.consPartitions
andShopitopia.consFiltered
. Observe that messages related to specific customers consistently fall into the same partitions.
In the last demo we'll demonstrate how you can use Apache Kafka connectors to move data from Apache Kafka topic into another data store. In this example we'll move data (sink data) into an OpenSearch cluster.
- You'll need an OpenSearch cluster. Either set it up from source code or use a managed option, such as Aiven for OpenSearch.
- Either set it up manually or enable Apache Kafka Connect in the managed version, for example this is how to do so in Aiven for Apache Kafka.
- You'll need to create a configuration file to be used by the connector. Follow this article how to use OpenSearch connector. In particular look at the example which uses JSON schema to transfer data.
- To help you, I've created an example of configuration. File connector.json, which is located in the root folder of this repository contains necessary parameters to connect to OpenSearch and send data. You only need to update OpenSearch connection information:
"connection.url": "https://opensearch-cluster-address:port", "connection.username": "your OpenSearch cluster login name", "connection.password": "your OpenSearch cluster password",
- Official docs for Apache Kafka.
- Official docs for Apache Kafka Connect API.
- Official docs for Apache Kafka Streams.
- A ready fake data generator to source data into Apache Kafka cluster.
- How to use kcat. A very handy utility to work with Apache Kafka from command line.
- How to use Karapace schema registry to align the structure of data coming to Kafka cluster.
- How to use Apache Kafka Connect as streaming bridge between different database technologies.
This work is licensed under the Apache License, Version 2.0. Full license text is available in the LICENSE file and at http://www.apache.org/licenses/LICENSE-2.0.txt