Robust RabbitMQ made easy.
Employ the robustness and configurability of spring-rabbitmq through a simplistic API with sensible defaults.
All functions for publishing and consuming messages can be found under rabid.core
namespace.
All functions for declaring and purging of queues, exhanges and bindings can be found under rabid.admin
namespace.
startup!
(startup! opts)
Open new RabbitMQ connection by using given options. If connection already exists, this function does nothing. Use this function at the beginning of your application.
(ns my-app.main
(:require [rabid.core :refer [startup!]]))
(defn -main []
(startup! {:hosts ["localhost"]})
; ... your app stuff ...
)
startup!
accepts the following options
name | description | default value |
---|---|---|
:hosts | Vector of RabbitMQ server hostsname(s) | localhost |
:port | RabbitMQ server port number | 5672 |
:vhost | RabbitMQ vhost | / |
:user | RabbitMQ username | guest |
:password | RabbitMQ password | guest |
:publisher-confirms | Use message delivery confirmation | true |
:publisher-returns | Return the message to the publisher if it cannot be delivered for any reason. | false |
:request-heartbeat | Heartbeat interval in seconds | 10 |
:connection-timeout | Connection timeout in seconds | 30 |
:batch | A map of properties that configures batch publishing (see below) | {:batch-size 25 :buffer-limit Integer/MAX_VALUE :timeout 50} |
:publish-retry-policy | A map of properties configuring the retry policy for published messages. | {:enabled? false :initial-interval 500 :multiplier 2.0 :max-interval 30000 :max-attempts Integer/MAX_VALUE :on-unconfirmed-message spring/default-unconfirmed-message-handler :on-returned-message spring/default-returned-message-handler} |
shutdown!
(shutdown!)
Closes open RabbitMQ connection and shuts down all registered consumers. Use this function at the end of your application.
Example how to use in your (production) app:
(ns my-app.main
(:require [rabid.core :refer [startup! shutdown!]]))
(defn -main []
(startup! {:hosts ["localhost"]})
; start your async stuff
(-> (Runtime/getRuntime)
(.addShutdownHook (Thread. (fn [] (shutdown!))))))
Example how to use in your tests:
(ns my-test
(:require [clojure.test :refer :all]
[rabid.core :refer [startup! shutdown!]]))
(defn with-rabbitmq [t]
(startup! {:hosts ["localhost"]})
(t)
(shutdown!))
(use-fixtures :once with-rabbitmq)
consume!
(consume! queue handler)
(consume! queue handler opts)
Starts a new consumer in a new thread to the given queue
by using using the given
handler
callback. Consumer is started to its own thread so this function doesn't
block. Instead, it returns a function which stops the started consumer when being
invoked.
handler
is a function with arity of 2: the first parameter is consumed message's
metadata, the second one is message's body bytes.
If the message processing completes successfully, handler must return true
which
indicates that the consumed message can be ack'ed. If return value is not true
,
then the message is rejected and re-queued.
If the consumer throws an unexpected error, then the error and message details
are logged by using clojure.tools.logging
.
consume!
accepts the following options map as a third optional parameter:
name | description | default value |
---|---|---|
:exclusive? | Whether to start consumer in exclusive mode or not | false |
:recovery-interval | Time period after which consumer retries to consume the queue either after a connection error or if the queue is being consumed by another exclusive consumer | 60000 |
:prefetch-count | Consumer's message buffer size (basic_qos ) | 1 |
:concurrent-consumers | Number of consumers to create | 1 |
:batch | Configures handling of batch messages. :parallel-handling? if true, uses pmap , otherwise map | {:parallel-handling? true} |
:requeue-rejected | Should a rejected message be re-queued or not. | true |
RabbitMQ connection must be established before calling this function.
(defn my-handler [meta body]
(let [message (json/parse-string (slurp body))
content-type (get (:headers meta) "Content-Type")]
; do something ...
; returning true indicates that the message can be acked
true))
(let [stop-consumer (consume! "my.queue" my-handler {:prefetch-count 10 :exclusive? true})]
...
(stop-consumer))
consume-and-reply!
(consume-and-reply! queue handler)
(consume-and-reply! queue handler opts)
Starts a new consumer in a new thread on the queue
.
Then handler
is a function (args: [^Map headers ^bytes message]
) that must return a vector ([^Map headers ^String payload]
).
The opts
map may contain the following keys.
name | description | default value |
---|---|---|
:exclusive? | Whether to start consumer in exclusive mode or not | false |
:recovery-interval | Time period after which consumer retries to consume the queue either after a connection error or if the queue is being consumed by another exclusive consumer | 60000 |
:prefetch-count | Consumer's message buffer size (basic_qos ) | 1 |
:concurrent-consumers | Number of consumers to create | 1 |
publish!
(publish! exchange routing-key payload & [headers props])
Publishes a message to the given exchange by using the given routing key. Payload can be either string or array of bytes.
(Optional) headers
is a map of {String String}
or {Keyword String}
.
(Optional) props
are RabbitMQ basic properties. All valid values, their
types and default values can be found here.
The following properties are pre-set (and overridable by the caller):
:timestamp (Date.)
:persistent? true
:content-encoding "UTF-8"
:content-length 0
:priority 0
RabbitMQ connection must be established before calling this function.
(publish! "my.exchange" "my.routing.key" "tsers!")
(publish! "my.exchange" "my.routing.key" "tsers!" {"Content-Type" "text/tsers"})
(publish! "my.exchange" "my.routing.key" "tsers!" {} {:app-id "my-app"})
publish-batch!
(publish-batch! exhange routing-key payload & [headers props])
Combines messages into batches and publishes each batch as a RabbitMQ message. This gives increased throughput with increased risk (Instead of potentially losing 1 message you may lose batch-size
of messages.
If the consumer is using rabid as well, it requires no special action on their part since the Spring message listener knows how to debatch messages.
The behavior of the batching can be configured with the following publish-batch! props object - the one depicted here shows the default values.
{:batch {:batch-size 25
:buffer-limit Integer/MAX_VALUE
:timeout 50}}
To further clarify, here are the batch properties among some of the other props:
{:timestamp (Date.)
:persistent? true
:batch {:batch-size 25
:buffer-limit Integer/MAX_VALUE
:timeout 50}}
:batch-size
defines the size of the batch, i.e. how many messages are combined into a single message.
:buffer-limit
defines the maximum size of the body of the combined message. If it is reached, the batch is published even if the batch-size has not been reached.
:timeout
defines the time, in milliseconds, after which the batch is published, even if the batch-size has not been reached.
The combined message's headers are retrieved from the first message in the batch.
WARNING! If the consumer rejects any of the messages in the batch, the whole batch is rejected - since it is a single message. The best approach to handling this situation is with a custom error-handler that re-publishes the individual messages it cannot handle.
publish-and-receive!
(publish-and-receive! target-queue payload)
(publish-and-receive! target-queue payload headers)
Publishes a message to the target-queue
and waits for a reply.
Payload can be either string or array of bytes.
Returns a vector [^Map response-headers ^String response-payload]
The consumer of target-queue
must publish a reply message to the default exchange with routing-key read from the replyTo
message property.
This is what consume-and-reply!
does.
consume-queued!
(consume-queued! queue)
(consume-queued! queue time-to-wait)
Synchronously consumes all queued messages from the given queue and stops consuming if queue is empty at least 500 milliseconds. The time can be parametrized by using optional second parameter.
Return value is a vector of tuples [meta body]
.
RabbitMQ connection must be established before calling this function.
(let [messages (consume-queued! "my.queue")]
(println (map (comp slurp second) messages)))
declare-queue!
(admin/declare-queue! queue-name & [opts])
Creates a queue with the given name and options.
opts
is an optional map of the queue options:
name | description | default value |
---|---|---|
:exclusive | Should the queue only be accessible to the current connection? | false |
:auto-delete | Should the queue be deleted when no consumers are using it? | false |
:durable | Should the queue persist after a restart? | true |
:arguments | Extra arguments passed to RabbitMQ when creating the queue. | {} |
declare-exchange!
(admin/declare-exchange! exchange-name & [type opts])
Creates an exchange with the given name, type and options.
type
is an optional keyword describing the type of the exchange. Currently supported types are :direct
and :topic
(default).
opts
is an optional map of the exchange options:
name | description | default value |
---|---|---|
:auto-delete | Should the exchange be deleted when no queues are using it? | false |
:durable | Should the exchange persist after a restart? | true |
:arguments | A map of additional exchange arguments | nil |
declare-binding!
(admin/declare-binding! queue-name exchange-name routing-key & [arguments])
Binds a combination of exchange + routing-key to the given queue. Empty string as exchange will use default exchange. Empty string as routing key will use no routing key.
arguments
is a map of additional binding arguments.
purge!
(admin/purge! queue-name)
Purges the given queue.
delete!
(admin/delete! queue-name)
Deletes the given queue.
delete-exchange!
(admin/delete-exchange! exchange-name)
Deletes the given exchange.
unbind!
(admin/unbind! queue-name exchange-name routing-key arguments)
Removes the binding from the given exchange + routing-key to the given queue.
message-count
(admin/message-count queue-name)
Gets the number of messages in the given queue.
Q: All my messages are JSON? How to consume them?
A: Wrap your handler inside another handler that does the parsing. Then create your own consume! implementation that applies this wrapper to the original handler.
(ns my-app.rabbitmq.main
(:require [cheshire.core :as json]
[rabid.core :as rabid]))
(defn- json-handler [handler]
(fn [meta body]
(let [msg (json/parse-string body true)
headers (:headers meta)]
(handler headers msg))))
(defn consume! [queue handler]
(rabid/consume! queue (json-handler handler)))
Q: How to deal with custom exceptions?
A: You can provide an error-handler function as the 4th parameter. Remember to return true to ack the message or false to reject it. If no error-handler is provided, the default-error-handler is used which logs the exception and acks the message.
(defn my-error-handler [msg e]
(cond
(instance? MyNetworkingException e) (do
(error e "Networking exception occurred. Retrying in 10 secs..")
(Thread/sleep 10
false)
(instance? MyDataException e) (do
(error e "Data is corrupt, discarding message " msg)
true)
:else (do
(error e "Unexpected error when handling" msg)
true))))
(defn consume! [queue handler opts]
(rabid/consume! queue handler opts my-error-handler))
Q: How to apply several different behaviors for the consumer?
A: By continuously wrapping your handler, you can create a handler chain to provide the behaviors you need by consumer.
(defn handler [meta body]
(do-all-the-things body)
true)
(defn incoming-logger-handler [handler]
(fn [meta body]
(info "Handling " body)
(handler meta body)))
(defn message-enricher-handler [handler]
(fn [meta body]
(let [meta (assoc meta :received (Date.))]
(handler meta body))))
(defn json-handler [handler]
(fn [meta body]
(let [msg (json/parse-string body true)
headers (:headers meta)]
(handler headers msg))))
(defn consume-queue-a! [queue handler opts]
; handlers are applied in reverse order
(rabid/consume! queue (-> handler
json-handler
message-enricher-handler
incoming-logger-handler)))
(defn consume-queue-b! [queue handler opts]
(rabid/consume! queue (-> handler
incoming-logger-handler)))
You must have the following dependencies installed
docker-toolbox
if running windows or os x, docker ( 1.7.1+ -> ) and docker-compose ( 1.5.2+ release ) if on linuxleiningen
docker
group (Linux only, see this)dockerhost
to /etc/hosts
(ip = localhost
in Linux)docker/start.sh
lein test
If you want to see debug logs when running tests, set log4j.rootLogger=DEBUG
to the test/log4j.properties
file.
MIT
Can you improve this documentation? These fine people already did:
Matti Lankinen, Lauri Siltanen, Lauri Oherd, Samuli Ulmanen, Kristoffer Snabb & Antti SalminenEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close