The Kafka inbound endpoint of WSO2 ESB acts as a message consumer. It creates a connection to ZooKeeper and requests messages for either a topic/s or topic filters.
Follow the steps below to configure the Kafka inbound endpoint to work with the ESB Profile of WSO2 EI:
- Download and install Apache Kafka.
The recommended version of Kafka for the Kafka inbound endpoint is kafka_2.12-1.0.0. For all available versions of Kafka, see https://kafka.apache.org/downloads.
-
Download the kafka_2.12-1.0.0.tgz from here and extract it. Let's call this directory <KAFKA_HOME>.
-
Go to https://store.wso2.com/store/assets/esbconnector/details/kafka, and click lick Download Inbound Endpoint to download the inbound JAR file and add the downloaded .jar file to the <EI_HOME>/dropins directory.
-
Copy the following client libraries from <KAFKA_HOME>/lib to <EI_HOME>/lib.
** kafka_2.12-1.0.0.jar
** kafka-clients-1.0.0.jar
** metrics-core-2.2.0.jar
** scala-library-2.12.3.jar
** zkclient-0.10.jar
** zookeeper-3.4.10.jar
Run the following command to start the ZooKeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties
Run the following command to start the Kafka server:
bin/kafka-server-start.sh config/server.properties
-
Given below is a sample Kafka configuration that can consume messages using a given topic/s:
Note: This configuration does not include security.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="kafka" sequence="request" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false"> <parameters> <parameter name="sequential">true</parameter> <parameter name="interval">10</parameter> <parameter name="coordination">true</parameter> <parameter name="inbound.behavior">polling</parameter> <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="topic.name">test</parameter> <parameter name="poll.timeout">100</parameter> <parameter name="bootstrap.servers">localhost:9092</parameter> <parameter name="group.id">hello</parameter> <parameter name="contentType">application/json</parameter> <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> </parameters> </inboundEndpoint>
You can add the above inbound configuration via the WSO2 ESB Management Console as well:
- Click the Main tab on the Management Console and then go to Manage -> Service Bus and click Inbound Endpoints to open the Inbound Endpoints page.
- On the Inbound Endpoints page, click Add Inbound Endpoint to open the New Inbound Endpoint page.
- Specify a name for the inbound endpoint, select the inbound protocol type as custom for the new inbound endpoint , and click Next.
-
Given below is a sample Kafka configuration that can consume messages using a given topic/s :
Note : This configuration includes security.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="kafka" sequence="request" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false"> <parameters> <parameter name="interval">10</parameter> <parameter name="coordination">true</parameter> <parameter name="sequential">true</parameter> <parameter name="inbound.behavior">polling</parameter> <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="topic.name">test</parameter> <parameter name="poll.timeout">100</parameter> <parameter name="bootstrap.servers">localhost:9092</parameter> <parameter name="group.id">hello</parameter> <parameter name="contentType">application/json</parameter> <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="ssl.keystore.location">/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.client.keystore.jks</parameter> <parameter name="security.protocol">SSL</parameter> <parameter name="ssl.truststore.location">/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.client.truststore.jks</parameter> <parameter name="ssl.keystore.password">test1234</parameter> <parameter name="ssl.truststore.password">test1234</parameter> <parameter name="contentType">application/json</parameter> </parameters> </inboundEndpoint>
Note: Make sure you provide the
sequential
andcoordination
parameters as shown in the above configuration. -
Given below is a sample Kafka configuration that can consume messages using a given topic/s :
Note : This configuration includes disable auto commit.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="kafka" sequence="request" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false"> <parameters> <parameter name="interval">10</parameter> <parameter name="coordination">true</parameter> <parameter name="sequential">true</parameter> <parameter name="inbound.behavior">polling</parameter> <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="topic.name">test</parameter> <parameter name="poll.timeout">100</parameter> <parameter name="bootstrap.servers">localhost:9092</parameter> <parameter name="group.id">hello</parameter> <parameter name="contentType">application/json</parameter> <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter> <parameter name="contentType">application/json</parameter> <parameter name="enable.auto.commit">false</parameter> <parameter name="failure.retry.count">5</parameter> </parameters> </inboundEndpoint>
Note: The
<property name="SET_ROLLBACK_ONLY" value="true"/>
should set in the fault sequence to poll the same record if in a failure case. The internal logic does not commit if the above property set and the offset set to the current record. Thefailure.retry.count
used to control the same record polling in a failure scenario. The offset set to next discarding the current record after exceeding the retry count. The default value is -1 means infinitely polling the same record in failure cases. The<property name="SET_ROLLBACK_ONLY" value="true"/>
andfailure.retry.count
parameter only effective whenenable.auto.commit
set to false.
Given below are the descriptions of all possible parameters that you can set in a Kafka configuration:
Parameter | Description | Required | Possible values |
---|---|---|---|
bootstrap.servers | A list of host or port pairs that you can use to establish the initial connection to the Kafka cluster | Yes | localhost:9092, localhost:9093 |
key.deserializer | The deserialiser class for the key that implements the Deserializer interface | Yes | class |
value.deserializer | The deserialiser class for the value that implements the Deserializer interface | Yes | class |
topic.name | Topic name to consume the messages from | Yes | String |
group.id | The unique string that identifies the consumer group that a consumer belongs to | Yes | String |
contentType | The message content type | Yes | application/json, application/xml, text/plain |
pollTimeout | The amount of time to block the consumer to consume messages | Yes | Long |
ssl.keystore.location | The location of the keystore file. Specifying this is optional for the client and can be used in two-way authentication for the client | Required for security enabled configurations | String |
ssl.keystore.password | The password for the keystore file. Specifying this is optional for the client and is only required if the ssl.keystore.location parameter is configured | Required for security enabled configurations | Password |
ssl.truststore.location | The location of the truststore file | Required for security enabled configurations | String |
ssl.truststore.password | The password for the truststore file. Note : If you do not set a password, access to the truststore will still be available but integrity checking will be disabled | Required for security enabled configurations | Password |
security.protocol | The protocol used to communicate with brokers. Possible values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL | Required for security enabled configurations | SSL, PLAINTEXT |
max.poll.records | The maximum number of records returned in a single call to poll. | Required for throttling | Integer |
enable.auto.commit | The default auto-commit is enabled. | Required for disable auto commit | true or false |
failure.retry.count | The offset set to the same record until the failure retry count exceeded. | The default value set to -1 | Positive Integer |
topic.partitions | A comma separated list of partition numbers of the topic to consume from | Required for manual partition assignment | String |
For more information on Kafka configuration parameters, see the Kafka Documentation.
For information on how to enable TLS authentication for Kafka brokers, producers, and consumers, see Enabling Security.
Kafka versions 0.9.0.0 and above support TLS. Enabling security for Kafka producers and consumers is a matter of configuration. It does not require any code changes.
The parameters required to support TLS is the same for both producers and consumers. As you are using mutual authentication, you should give the security protocol as well as the truststore and keystore information.
Let's see how to use a Kafka producer to start producing messages. First, either start the console producer or use the Kafka connector to produce the message. You can start the producer with or without security.
To start the console producer without security, execute the following command. Alternatively, you can use the Kafka connector without security.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
To start the console producer with security, execute the following command. Alternatively, you can use the Kafka connector with security.
kafka-console-producer –broker-list localhost:9092 –topic test –producer.config {file-path}/producer_ssl.properties
Use the following configuration to enable security for the console producer:
security.protocol=SSL
ssl.truststore.location={file-path}/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location={file-path}/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Note: If the passwords are stored in the client configuration, it is important to restrict access to the file via filesystem permission.
Send the following message using the console producer or the Kafka connector:
{"test":"wso2"}
{"test":"wso2"}
{"test":"wso2"}
Create a sample sequence as follows:
<sequence xmlns="http://ws.apache.org/ns/synapse" name="request" onError="fault">
<log level="full"/>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="partitionNo"
expression="get-property('partitionNo')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="messageValue"
expression="get-property('messageValue')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="offset"
expression="get-property('offset')"/>
</log>
</sequence>
Create a sample fault sequence as follows:
<sequence xmlns="http://ws.apache.org/ns/synapse" name="fault">
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="ERROR_CODE"
expression="get-property('ERROR_CODE')"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="ERROR_MESSAGE"
expression="get-property('ERROR_MESSAGE')"/>
</log>
<drop/>
</sequence>
After a message is produced using the console producer or the connector, the debug log of the ESB displays an INFO message as follows:
First, you need to pass Kafka record header with your Kafka message. The following sample assumes that the header name is [KAFKA_HEADER_NAME] and Kafka message topic is [KAFKA_TOPIC_NAME]
Create a sample sequence as follows:
<sequence xmlns="http://ws.apache.org/ns/synapse" name="request" onError="fault">
<log level="full"/>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="Topic"
expression="get-property('topic')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="messageValue"
expression="get-property('messageValue')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd"
name="Content Type(Header Value)"
expression="get-property('[KAFKA_TOPIC_NAME].[KAFKA_HEADER_NAME]')"/>
</log>
</sequence>
Now you can view the header as a log in the termial as follows.