Skip to content

Fork of the RabbitMQ Source/Sink for Kafka Connect

License

Notifications You must be signed in to change notification settings

depop/kafka-connect-rabbitmq

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Introduction

Source Connectors

RabbitMQSourceConnector

Connector is used to read from a RabbitMQ Queue or Topic.

Configuration

kafka.topic

Importance: High

Type: String

Kafka topic to write the messages to.

rabbitmq.queue

Importance: High

Type: List

rabbitmq.queue

rabbitmq.host

Importance: High

Type: String

Default Value: localhost

The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->_

rabbitmq.password

Importance: High

Type: String

Default Value: guest

The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->_

rabbitmq.username

Importance: High

Type: String

Default Value: guest

The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->_

rabbitmq.virtual.host

Importance: High

Type: String

Default Value: /

The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->_

message.converter

Importance: Medium

Type: String

Default Value: com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter

Other allowed values:

  • com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter
  • com.github.themeetgroup.kafka.connect.rabbitmq.source.data.StringSourceMessageConverter

Converter to compose the Kafka message.

rabbitmq.port

Importance: Medium

Type: Int

Default Value: 5672

The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->_

rabbitmq.prefetch.count

Importance: Medium

Type: Int

Default Value: 0

Maximum number of messages that the server will deliver, 0 if unlimited. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->_

rabbitmq.prefetch.global

Importance: Medium

Type: Boolean

Default Value: false

True if the settings should be applied to the entire channel rather than each consumer. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->_

rabbitmq.automatic.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->_

rabbitmq.connection.timeout.ms

Importance: Low

Type: Int

Default Value: 60000

Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->_

rabbitmq.handshake.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->_

rabbitmq.network.recovery.interval.ms

Importance: Low

Type: Int

Default Value: 10000

See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->_

rabbitmq.requested.channel.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->_

rabbitmq.requested.frame.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->_

rabbitmq.requested.heartbeat.seconds

Importance: Low

Type: Int

Default Value: 60

Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->_

rabbitmq.shutdown.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->_

rabbitmq.topology.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->_

Examples

Standalone Example

This configuration is used typically along with standalone mode.

name=RabbitMQSourceConnector1
connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >
Distributed Example

This configuration is used typically along with distributed mode. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s).

{
  "config" : {
    "name" : "RabbitMQSourceConnector1",
    "connector.class" : "com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "< Required Configuration >",
    "rabbitmq.queue" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new instance.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing instance.

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config

Sink Connectors

RabbitMQSinkConnector

Connector is used to read data from a Kafka topic and publish it on a RabbitMQ exchange and routing key pair.

Configuration

rabbitmq.exchange

Importance: High

Type: String

exchange to publish the messages on.

rabbitmq.routing.key

Importance: High

Type: String

routing key used for publishing the messages.

rabbitmq.format

Importance: High

Type: String

Default Value: bytes

Other allowed values:

  • json
  • avro (non Confluent avro)

The format type to use when writing data to RabbitMQ

topics

Importance: High

Type: String

Kafka topic to read the messages from.

rabbitmq.host

Importance: High

Type: String

Default Value: localhost

The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->_

rabbitmq.password

Importance: High

Type: String

Default Value: guest

The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->_

rabbitmq.username

Importance: High

Type: String

Default Value: guest

The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->_

rabbitmq.virtual.host

Importance: High

Type: String

Default Value: /

The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->_

rabbitmq.port

Importance: Medium

Type: Int

Default Value: 5672

The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->_

rabbitmq.automatic.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->_

rabbitmq.connection.timeout.ms

Importance: Low

Type: Int

Default Value: 60000

Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->_

rabbitmq.handshake.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->_

rabbitmq.network.recovery.interval.ms

Importance: Low

Type: Int

Default Value: 10000

See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->_

rabbitmq.requested.channel.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->_

rabbitmq.requested.frame.max

Importance: Low

Type: Int

Default Value: 0

Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->_

rabbitmq.requested.heartbeat.seconds

Importance: Low

Type: Int

Default Value: 60

Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->_

rabbitmq.shutdown.timeout.ms

Importance: Low

Type: Int

Default Value: 10000

Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->_

rabbitmq.topology.recovery.enabled

Importance: Low

Type: Boolean

Default Value: true

Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->_

Examples

Standalone Example

This configuration is used typically along with standalone mode.

name=RabbitMQSinkConnector1
connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector
tasks.max=1
topics=< Required Configuration >
rabbitmq.exchange=< Required Configuration >
rabbitmq.routing.key=< Required Configuration >
topics=< Required Configuration >
Distributed Example

This configuration is used typically along with distributed mode. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s).

{
  "config" : {
    "name" : "RabbitMQSinkConnector1",
    "connector.class" : "com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector",
    "tasks.max" : "1",
    "topics" : "< Required Configuration >",
    "rabbitmq.exchange" : "< Required Configuration >",
    "rabbitmq.routing.key" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new instance.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing instance.

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config

About

Fork of the RabbitMQ Source/Sink for Kafka Connect

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 94.1%
  • Shell 5.6%
  • Dockerfile 0.3%