Nodejs wrapper for the JAVA kafka 0.8 client API.
- Motivation
- Installation
- Example
- Performance and stability
- API
- Adding your own jars to classpath
- Java Tier Logging
- Troubleshooting
- Sources
- License
The need to have a production quality kafka0.8 client implementation in Nodejs. Please see:
- Performance and stability for detailed information.
- Make sure you have java v7 or higher installed
- Run
npm install kafka-java-bridge
var HLConsumer = require("kafka-java-bridge").HLConsumer;
var consumerOptions = {
zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
groupId: "example-consumer-group-id",
topics: ["example-topic1","example-topic2"],
getMetadata: true
};
var consumer = new HLConsumer(consumerOptions);
consumer.start(function (err) {
if (err) {
console.log("Error occurred when starting consumer. err:", err);
} else {
console.log("Started consumer successfully");
}
});
consumer.on("message", function (msg, metadata) {
console.log("On message. message:", msg);
console.log("On message. metadata:", JSON.stringify(metadata));
});
consumer.on("error", function (err) {
console.log("On error. err:", err);
});
process.on('SIGINT', function() {
consumer.stop(function(){
console.log("consumer stopped");
// Timeout to allow logs to print
setTimeout(function(){
process.exit();
} , 300);
});
});
var StringProducer = require('kafka-java-bridge').StringProducer;
var BinaryProducer = require('kafka-java-bridge').BinaryProducer;
var stringProducer = new StringProducer({bootstrapServers: "broker1:9092, broker2:9092"});
var binaryProducer = new BinaryProducer({zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka"});
const buf = new Buffer([0x0, 0x1, 0x2, 0x3, 0x4]);
binaryProducer.send("myBinaryTopic", buf, function(err, msgMetadata){
console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});
stringProducer.send("myStringTopic", "testString", function(err, msgMetadata){
console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});
process.on('SIGINT', function() {
stringProducer.close(function(err){
binaryProducer.close(function(err) {
process.exit();
});
});
});
Libraries compared:
- kafka-java-bridge , this package.
- kafka-node, available High Level Consumer for kafka0.8.
- We show below representative cpu consumption (lower is better) for processing same amount of messages per second(~11K).
Library name | CPU% average |
---|---|
kafka-java-bridge | 11.76 |
kafka-node | 73 |
- Consumer comparision (number of messages). Tested with 16GB Ram, 4 core machine on Amazon AWS EC2 Instance. (Metircs measured with Newrelic)
Library name | Rpm Avg | Network Avg | Cpu/System Avg |
---|---|---|---|
kafka-java-bridge | 947K | 300 Mb/s | 6.2% |
kafka-node | 87.5K | 75 Mb/s | 11.2% |
Kafka-java-bridge wraps Confluent's official Java High Level Consumer.
While testing kafka-node we encountered multiple issues such as:
Those issues along side with the inadequate performance results where the trigger for developing this library.
Consumer object allows messages fetching from kafka topic. Each consumer can consume messages from multiple topics.
var consumerOptions = {
zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
groupId: "example-consumer-group-id",
topic: "example-topic",
serverPort: 3042,// Optional
threadCount: 1,// Optional
properties: {"rebalance.max.retries": "3"}// Optional
};
Option name | Mandatory | Type | Default value | Description |
---|---|---|---|---|
zookeeperUrl | Yes | String |
undefined |
Zookeeper connection string. |
groupId | Yes | String |
undefined |
Kafka consumer groupId. From kafka documentation: groupId is a string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. |
topic | No | String |
undefined |
Kafka topic name. |
getMetadata | No | boolean |
false | If true, message metadata(topic, partition, offset) will be provided with each message. Use false for better performance. |
topics | Yes | Array of String |
undefined |
Kafka topics names array. |
serverPort | No | Number |
3042 |
Internal server port used to transfer the messages from the java thread to the node js thread. |
threadCount | No | Number |
1 |
The threading model revolves around the number of partitions in your topic and there are some very specific rules. For More information: kafka consumer groups |
properties | No | Object |
undefined |
Properties names can be found in the following table: high level consumer properties. |
- message: this event is emitted when a message is consumed from kafka.
- error: this event is emitted when an error occurs while consuming messages.
Start consumer messages from kafka topic.
cb - callback is called when the consumer is started.
If callback was called with err it means consumer failed to start.
Stop consuming messages.
cb - callback is called when the consumer is stopped.
message/error events can still be emitted until stop callback is called.
Producer object produces messages to kafka. With each message topic is specified so one producer can produce messages to multiple topics.
StringProducer should be used to send string messages. BinaryProducer should be used to send binary messages.
var producerOptions = {
zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
properties: {"client.id": "kafka-java-bridge"}// Optional
};
OR
var producerOptions = {
bootstrapServers: "kafka:2181,kafka2:2181,kafka3:2181/kafka",
properties: {"client.id": "kafka-java-bridge"}// Optional
};
Option name | Mandatory | Type | Default value | Description |
---|---|---|---|---|
bootstrapServers | NO | String |
undefined |
Kafka broker connection string. |
zookeeperUrl | No | String |
undefined |
Zookeeper connection string. If provided, broker list will be retrieved from standard path. |
properties | No | Object |
undefined |
Properties names can be found in the following table: high level producer properties. |
topic - target topic name String
.
msg - message to be sent to kafka String
or Buffer
.
cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.
topic - target topic name String
.
msg - message to be sent to kafka String
or Buffer
.
key - kafka message key String
or Buffer
.
cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.
topic - target topic name String
.
msg - message to be sent to kafka String
or Buffer
.
key - kafka message key String
or Buffer
.
partition - target partition Integer
.
cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.
If you wish to add jars to the classpath, it can be done by placing them at:
{app root path}/kafka-java-bridge/java/lib/yourjar.jar
By default, underlying java tier logging is disabled.
If you wish to enable java tier logging you can place your own log4j.properties file at:
{app root path}/kafka-java-bridge/log4j/log4j.properties
In case of installation failure, you may want to take a look at our dependency java npm installation and troubleshooting sections.
If you are working on a windows machine, you may want to look at windows-build-tools for native code compilation issues.
MIT