Liking cljdoc? Tell your friends :D

Rabid

Travis build GitHub issues

Robust RabbitMQ made easy.

Clojars Project

Motivation

Employ the robustness and configurability of spring-rabbitmq through a simplistic API with sensible defaults.

API

All functions for publishing and consuming messages can be found under rabid.core namespace. All functions for declaring and purging of queues, exhanges and bindings can be found under rabid.admin namespace.

startup!

(startup! opts)

Open new RabbitMQ connection by using given options. If connection already exists, this function does nothing. Use this function at the beginning of your application.

(ns my-app.main
  (:require [rabid.core :refer [startup!]]))

(defn -main []
  (startup! {:hosts ["localhost"]})
  ; ... your app stuff ...
  )

startup! accepts the following options

namedescriptiondefault value
:hostsVector of RabbitMQ server hostsname(s)localhost
:portRabbitMQ server port number5672
:vhostRabbitMQ vhost/
:userRabbitMQ usernameguest
:passwordRabbitMQ passwordguest
:publisher-confirmsUse message delivery confirmationtrue
:publisher-confirm-typeDelivery confirmation type (:async :sync :none):async
:publisher-returnsReturn the message to the publisher if it cannot be delivered for any reason.false
:request-heartbeatHeartbeat interval in seconds10
:connection-timeoutConnection timeout in seconds30
:batchA map of properties that configures batch publishing (see below){:batch-size 25 :buffer-limit Integer/MAX_VALUE :timeout 50}
:publish-retry-policyA map of properties configuring the retry policy for published messages.{:enabled? false :initial-interval 500 :multiplier 2.0 :max-interval 30000 :max-attempts Integer/MAX_VALUE :on-unconfirmed-message spring/default-unconfirmed-message-handler :on-returned-message spring/default-returned-message-handler}

shutdown!

(shutdown!)

Closes open RabbitMQ connection and shuts down all registered consumers. Use this function at the end of your application.

Example how to use in your (production) app:

(ns my-app.main
  (:require [rabid.core :refer [startup! shutdown!]]))

(defn -main []
  (startup! {:hosts ["localhost"]})
  ; start your async stuff
  (-> (Runtime/getRuntime)
      (.addShutdownHook (Thread. (fn [] (shutdown!))))))

Example how to use in your tests:

(ns my-test
  (:require [clojure.test :refer :all]
            [rabid.core :refer [startup! shutdown!]]))

(defn with-rabbitmq [t]
  (startup! {:hosts ["localhost"]})
  (t)
  (shutdown!))

(use-fixtures :once with-rabbitmq)

consume!

(consume! queue handler)
(consume! queue handler opts)

Starts a new consumer in a new thread to the given queue by using using the given handler callback. Consumer is started to its own thread so this function doesn't block. Instead, it returns a function which stops the started consumer when being invoked.

handler is a function with arity of 2: the first parameter is consumed message's metadata, the second one is message's body bytes.

If the message processing completes successfully, handler must return true which indicates that the consumed message can be ack'ed. If return value is not true, then the message is rejected and re-queued.

If the consumer throws an unexpected error, then the error and message details are logged by using clojure.tools.logging.

consume! accepts the following options map as a third optional parameter:

namedescriptiondefault value
:exclusive?Whether to start consumer in exclusive mode or notfalse
:recovery-intervalTime period after which consumer retries to consume the queue either after a connection error or if the queue is being consumed by another exclusive consumer60000
:prefetch-countConsumer's message buffer size (basic_qos)1
:concurrent-consumersNumber of consumers to create1
:batchConfigures handling of batch messages. :parallel-handling? if true, uses pmap, otherwise map{:parallel-handling? true}
:requeue-rejectedShould a rejected message be re-queued or not.true

RabbitMQ connection must be established before calling this function.

(defn my-handler [meta body]
  (let [message (json/parse-string (slurp body))
        content-type (get (:headers meta) "Content-Type")]
    ; do something ...
    ; returning true indicates that the message can be acked
    true))

(let [stop-consumer (consume! "my.queue" my-handler {:prefetch-count 10 :exclusive? true})]
  ...
  (stop-consumer))

consume-and-reply!

(consume-and-reply! queue handler)
(consume-and-reply! queue handler opts)

Starts a new consumer in a new thread on the queue.

Then handler is a function (args: [^Map headers ^bytes message]) that must return a vector ([^Map headers ^String payload]).

The opts map may contain the following keys.

namedescriptiondefault value
:exclusive?Whether to start consumer in exclusive mode or notfalse
:recovery-intervalTime period after which consumer retries to consume the queue either after a connection error or if the queue is being consumed by another exclusive consumer60000
:prefetch-countConsumer's message buffer size (basic_qos)1
:concurrent-consumersNumber of consumers to create1

publish!

(publish! exchange routing-key payload & [headers props])

Publishes a message to the given exchange by using the given routing key. Payload can be either string or array of bytes.

(Optional) headers is a map of {String String} or {Keyword String}.

(Optional) props are RabbitMQ basic properties. All valid values, their types and default values can be found here. The following properties are pre-set (and overridable by the caller):

  • :timestamp (Date.)
  • :persistent? true
  • :content-encoding "UTF-8"
  • :content-length 0
  • :priority 0

RabbitMQ connection must be established before calling this function.

(publish! "my.exchange" "my.routing.key" "tsers!")
(publish! "my.exchange" "my.routing.key" "tsers!" {"Content-Type" "text/tsers"})
(publish! "my.exchange" "my.routing.key" "tsers!" {} {:app-id "my-app"})

publish-batch!

(publish-batch! exhange routing-key payload & [headers props])

Combines messages into batches and publishes each batch as a RabbitMQ message. This gives increased throughput with increased risk (Instead of potentially losing 1 message you may lose batch-size of messages. If the consumer is using rabid as well, it requires no special action on their part since the Spring message listener knows how to debatch messages.

The behavior of the batching can be configured with the following publish-batch! props object - the one depicted here shows the default values.

{:batch {:batch-size 25
         :buffer-limit Integer/MAX_VALUE
         :timeout 50}}

To further clarify, here are the batch properties among some of the other props:

{:timestamp (Date.)
 :persistent? true
 :batch {:batch-size 25
         :buffer-limit Integer/MAX_VALUE
         :timeout 50}}

:batch-size defines the size of the batch, i.e. how many messages are combined into a single message. :buffer-limit defines the maximum size of the body of the combined message. If it is reached, the batch is published even if the batch-size has not been reached. :timeout defines the time, in milliseconds, after which the batch is published, even if the batch-size has not been reached.

The combined message's headers are retrieved from the first message in the batch.

WARNING! If the consumer rejects any of the messages in the batch, the whole batch is rejected - since it is a single message. The best approach to handling this situation is with a custom error-handler that re-publishes the individual messages it cannot handle.

publish-and-receive!

(publish-and-receive! target-queue payload)
(publish-and-receive! target-queue payload headers)
(publish-and-receive! target-queue payload headers opts)

Publishes a message to the target-queue and waits for a reply.
Payload can be either string or array of bytes. Returns a vector [^Map response-headers ^String response-payload] or throws an ExceptionInfo with :type :rabid/receive-timeout if timeout occurs.

The consumer of target-queue must publish a reply message to the default exchange with routing-key read from the replyTo message property. This is what consume-and-reply! does.

consume-queued!

(consume-queued! queue)
(consume-queued! queue time-to-wait)

Synchronously consumes all queued messages from the given queue and stops consuming if queue is empty at least 500 milliseconds. The time can be parametrized by using optional second parameter.

Return value is a vector of tuples [meta body].

RabbitMQ connection must be established before calling this function.

(let [messages (consume-queued! "my.queue")]
  (println (map (comp slurp second) messages)))

declare-queue!

(admin/declare-queue! queue-name & [opts])

Creates a queue with the given name and options.

opts is an optional map of the queue options:

namedescriptiondefault value
:exclusiveShould the queue only be accessible to the current connection?false
:auto-deleteShould the queue be deleted when no consumers are using it?false
:durableShould the queue persist after a restart?true
:queue-typeQueue type, :quorum for a quorum queue:classic
:argumentsExtra arguments passed to RabbitMQ when creating the queue.{}

declare-exchange!

(admin/declare-exchange! exchange-name & [type opts])

Creates an exchange with the given name, type and options.

type is an optional keyword describing the type of the exchange. Currently supported types are :direct and :topic (default). opts is an optional map of the exchange options:

namedescriptiondefault value
:auto-deleteShould the exchange be deleted when no queues are using it?false
:durableShould the exchange persist after a restart?true
:argumentsA map of additional exchange argumentsnil

declare-binding!

(admin/declare-binding! queue-name exchange-name routing-key & [arguments])

Binds a combination of exchange + routing-key to the given queue. Empty string as exchange will use default exchange. Empty string as routing key will use no routing key.

arguments is a map of additional binding arguments.

purge!

(admin/purge! queue-name)

Purges the given queue.

delete!

(admin/delete! queue-name)

Deletes the given queue.

delete-exchange!

(admin/delete-exchange! exchange-name)

Deletes the given exchange.

unbind!

(admin/unbind! queue-name exchange-name routing-key arguments)

Removes the binding from the given exchange + routing-key to the given queue.

message-count

(admin/message-count queue-name)

Gets the number of messages in the given queue.

FAQ

Q: All my messages are JSON? How to consume them?

A: Wrap your handler inside another handler that does the parsing. Then create your own consume! implementation that applies this wrapper to the original handler.

(ns my-app.rabbitmq.main
  (:require [cheshire.core :as json]
            [rabid.core :as rabid]))

(defn- json-handler [handler]
  (fn [meta body]
    (let [msg (json/parse-string body true)
          headers (:headers meta)]
      (handler headers msg))))

(defn consume! [queue handler]
    (rabid/consume! queue (json-handler handler)))

Q: How to deal with custom exceptions?

A: You can provide an error-handler function as the 4th parameter. Remember to return true to ack the message or false to reject it. If no error-handler is provided, the default-error-handler is used which logs the exception and acks the message.

(defn my-error-handler [msg e]
  (cond
    (instance? MyNetworkingException e) (do
                                          (error e "Networking exception occurred. Retrying in 10 secs..")
                                          (Thread/sleep 10
                                          false)
    (instance? MyDataException e) (do
                                    (error e "Data is corrupt, discarding message " msg)
                                    true)
    :else (do
            (error e "Unexpected error when handling" msg)
            true))))

(defn consume! [queue handler opts]
  (rabid/consume! queue handler opts my-error-handler))

Q: How to apply several different behaviors for the consumer?

A: By continuously wrapping your handler, you can create a handler chain to provide the behaviors you need by consumer.

(defn handler [meta body]
  (do-all-the-things body)
  true)

(defn incoming-logger-handler [handler]
  (fn [meta body]
    (info "Handling " body)
    (handler meta body)))

(defn message-enricher-handler [handler]
  (fn [meta body]
    (let [meta (assoc meta :received (Date.))]
      (handler meta body))))

(defn json-handler [handler]
  (fn [meta body]
    (let [msg (json/parse-string body true)
          headers (:headers meta)]
      (handler headers msg))))

(defn consume-queue-a! [queue handler opts]
  ; handlers are applied in reverse order
  (rabid/consume! queue (-> handler
                            json-handler
                            message-enricher-handler
                            incoming-logger-handler)))

(defn consume-queue-b! [queue handler opts]
  (rabid/consume! queue (-> handler
                            incoming-logger-handler)))

Development

Pre-requirements

You must have the following dependencies installed

  • docker-toolbox if running windows or os x, docker ( 1.7.1+ -> ) and docker-compose ( 1.5.2+ release ) if on linux
  • leiningen
  • Your user added to docker group (Linux only, see this)
  • Added your docker machine's ip as dockerhost to /etc/hosts (ip = localhost in Linux)

Running tests

docker/start.sh
lein test

If you want to see debug logs when running tests, set log4j.rootLogger=DEBUG to the test/log4j.properties file.

License

MIT

Can you improve this documentation? These fine people already did:
Matti Lankinen, Lauri Siltanen, Lauri Oherd, Samuli Ulmanen, Heikki Honkanen, Kristoffer Snabb & Antti Salminen
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close