Skip to content
/ carrot Public

middleware for langohr providing simple RabbitMQ message retrial and dead letter handling

License

Notifications You must be signed in to change notification settings

raskig/carrot

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

95 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

carrot:\

Library to keep your rabbits running...

alt tag

A Clojure library designed to provide you with the implementation of the following simple RabbitMq delayed retry mechanism (click to enlarge): alt tag

...Or a more sophisticated exponential back off (click to enlarge): alt tag

The idea is the following:

  1. Provider sends a message to the message queue
  2. Consumer tries to process it
  3. While processing some exception is thrown
  4. Failed message gets into the retry-queue for a period of time which can be configured (in case of simple delayed retry) or will be calculated based on your configuration (in case of exponential back off).
  5. After the ttl expired the message is put back to message queue to try to process it again
  6. Steps 3-5 repeated until the message successfully processed or until the number of retries are less than the max-retry you running carrot with.
  7. When we exceed max retry, we put the message in the corresponding dead-letter queue sorted.

Releases and Dependency Information

Leiningen dependency information:

[carrot "2.0.0"]

Usage

No need to worry if the above diagram seems to be too complicated. The idea is that you give custom names to the queues and exchanges you find on the diagram and Carrot will provide you with the retry mechanism and will create the architecturev for you.

Main steps:

  • Require carrot in your code:
:require [carrot.core :as carrot]
  • Define your exchange and queue names in a carrot config map along with the retry configuration strategy (for details see architecture)

simple backoff example:

(def carrot-system {:retry-config {:strategy :simple-backoff
                                   :message-ttl 3000
                                   :max-retry-count 3}
                    :retry-exchange "retry-exchange"
                    :dead-letter-exchange "dead-letter-exchange"
                    :retry-queue "retry-queue"
                    :message-exchange "message-exchange"
                    :exchange-type "topic"
                    :exchange-config {:durable true}
                    :retry-queue-config {:arguments {"x-max-length" 1000}}})

exponential backoff example:

(def carrot-system {:retry-config {:strategy :exp-backoff
                                         :initial-ttl 30
                                         :max-ttl 360000
                                         :max-retry-count 3
                                         :next-ttl-function exp-backoff-carrot/next-ttl}
                          :retry-exchange "retry-exchange"
                          :dead-letter-exchange "dead-letter-exchange"
                          :retry-queue "retry-queue"
                          :message-exchange "message-exchange"
                          :exchange-type "topic"
                          :exchange-config {:durable true}
                          :retry-queue-config {:arguments {"x-max-length" 1000}}})

In case of exponential backoff, you can define your own function to determine the next ttl value after a retry.

  • Declare your carrot system which will declare exchanges and queues with the given configuration:
(carrot/declare-system channel carrot-system)

Here the "channel" is the open langohr connection.

  • You subscribe for your message queues by using carrot subscribe function:
(carrot/subscribe channel
                        carrot-system
                        qname
                        (carrot/crate-message-handler-function
                         (comp
                          message-handler-01
                          message-handler-02
                          ;;here you can en list more functions and they will be threaded in order via threading macr
                          ;;and will compose a message handler function
                          )
                         qname
                         carrot-system
                         println)
                        {:auto-ack false}
                        dead-queue-config-function)

Here the dead-queue-config-function is a function you define which returns the config map for dead letter queues. Example:

(defn dead-queue-config-function [queue-name]
  {:arguments {"x-max-length" 1000}})

Full example code

Exception handling

  • You can throw exceptions for not retrying. If you throw this exception it indicates that carrot should not retry the message because the message can't ever be processed, so no reason for retrial.
(carrot/throw-do-not-retry-exception "Something fatal happaned." {:cause "The message can not be processed."})

Example code for this

  • Optionally you can declare functions you don't accept exceptions from, for retrying. Bacically you enlist functions here where any exception means the message can't ever be processed, so no reason for retrial.
(carrot/do-not-retry! [#'my-namespace/message-handler-02
                #'my-namespace/message-handler-04])

Example code for this

Exponencial backoff

  • You can configure carrot to run with exponencial backoff by defining te system using ex-backoff startegy:
(def carrot-system {:retry-config {:strategy :exp-backoff
                                         :initial-ttl 30
                                         :max-ttl 360000
                                         :max-retry-count 3
                                         :next-ttl-function exp-backoff-carrot/next-ttl}
                          :retry-exchange "retry-exchange"
                          :dead-letter-exchange "dead-letter-exchange"
                          :retry-queue "retry-queue"
                          :message-exchange "message-exchange"
                          :exchange-type "topic"
                          :exchange-config {:durable true}
                          :retry-queue-config {:arguments {"x-max-length" 1000}}})

TODOS

  • support function for replaying messages ended up in dead letter queue
  • possibly: provide a function to hanldle dead letters and one strategy could be to put them in dead letter queues

License

Copyright © 2017 Gabor Raski

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

About

middleware for langohr providing simple RabbitMQ message retrial and dead letter handling

Resources

License

Stars

Watchers

Forks

Packages

No packages published