Buniccula is framework for asynchronous messaging with RabbitMQ.
It defines 4 components (based on Stuart Sierra's component lib)
Bunnicula follows the RabbitMQ best practices, inspired by following blog posts.
See the relevant practices implemented in Bunnicula
Bunnicula uses official RabbitMQ Java client for creating connections, consumers etc. As of version 4.0.0 of the Java client, automatic recovery is enabled by default. See more details here
By default we ensure your data will survive server restart.
All queues defined by consumer component are durable. Publisher component publishes persistent message by default.
Connection component represents RabbitMQ connection. When component is started, new RabbitMQ connection is opened. When component is closed, connection is closed!
Connection component is used by publisher & consumers as component dependency!
url
key
or by map with host
, port
, username
and password
keysvhost
is always required to be present in configurationconnection-name
(require '[bunnicula.component.connection :as connection]
'[com.stuartsierra.component :as component])
(def connection (connection/create {:username "rabbit"
:password "password"
:host "127.0.0.1"
:port 5672
:vhost "/main"
:connection-name "connection-test"}))
;; connection specified by url
;; (def connection (connection/create {:url "amqp://rabbit:password@127.0.0.1:5672"
;; :vhost "/main"}))
(component/start connection)
Publisher component is used for publishing messages to the broker. When component is started, new channel is opened. When component is stopped, channel is closed.
The connection component is a required dependency, it has to be present under the rmq-connection key in the system map.
exchange-name
the default exchange messages will be published toserialization-fn
(optional) function used to serialize message (represented as clojure data)
to RabbitMQ message body (bytes). Default function uses json-serialization.Publisher component implements publish method, which has multiple arity
(publish publisher routing-key message)
will publish message to default exchange
with given routing-key(publish publisher routing-key message options)
will publish message to default exchange
with given routing-key and options(publish publisher exchange routing-key message options)
will publish message to specified exchange
with given routing-key and optionsFollowing options are supported for publishing
mandatory
flag to ensure message delivery, default falseexpiration
set message expirationpersistent
whether to persist message on disk, default true!(require '[bunnicula.component.connection :as connection]
'[bunnicula.component.publisher :as publisher]
'[bunnicula.protocol :as protocol]
'[com.stuartsierra.component :as component])
(def connection (connection/create {:url "amqp://rabbit:password@127.0.0.1:5672"
:vhost "/main"}))
(def publisher (publisher/create {:exchange-name "my-exchange"}))
(def system (-> (component/system-map
:publisher (component/using
publisher
[:rmq-connection])
:rmq-connection connection)
component/start-system))
;; publish to 'my-exchange' exchange with routing-key 'some.queue'
(protocol/publish (:publisher system)
"some.queue"
{:integration_id 1 :message_id "123"})
;; publish to 'another-exchange' exchange with routing-key 'some.queue' and options map
(protocol/publish (:publisher system)
"another-exchange"
"some.queue"
{:integration_id 1 :message_id "123"}
{:mandatory true :persistent true})
For testing purposes or in development mode you can use the Mock Component.
Mock publisher component contains queues
atom which represents the RabbitMQ.
It is initiated as empty map and any time you publish message,
its value is added to queues
atom
(require '[bunnicula.component.publisher.mock :as mock]
'[bunnicula.protocol :as protocol]
'[com.stuartsierra.component :as component])
(def p (-> (mock/create) component/start))
(protocol/publish p
"some.queue"
{:integration_id 1 :message_id "123"})
(protocol/publish p
"some.queue"
{:integration_id 1 :message_id "456"})
(-> p :queues deref)
;; => {"some.queue" [{:integration_id 1, :message_id "456"}
;; {:integration_id 1, :message_id "123"}]}
Defines consumer with auto retry functionality!
The component is composed of a message handler (a plain Clojure function), consumer threads and channels. When processing fails it can be retried automatically with a fixed amount of times with a delay between each attempt. If processing still fails - messages will be pushed to an error queue for further inspection and manual retrying.
When the component starts it will create necessary exchanges and queues if they do not exist.
When component stops consumers are destroyed and channels are closed. All exchanges, queues and their messages will remain intact. Messages will be fetched again when the consumer reconnects.
Processing message on consumer can result in one of following results
Assume configured queue-name is some.queue
.
Following exchanges are declared when component is started
some.queue-retry
some.queue-error
some.queue-requeue
Following queues are declared when component is started
some.queue
which is bind to pre-configured exchange via 'some.queue' routing key (routing-key=queue-name)some.queue-retry
which is bind to retry exchange via '#' routing key,
expired messages are sent to dead letter exchange some.queue-requeue
some.queue-error
which is bind to error exchange via '#' routing keyNote all queues are durable and publishing messages between queues ensure they are persisted on disk.
⚠️ Main exchange used by regular work queues (not retry/error) has to be created before starting the consumer. Only retry and error exchanges are created by the consumer component.
The connection component and monitoring component are required dependencies,
it has to be present under :rmq-connection
and :monitoring
key in the system map.
message-handler-fn
handler fn for consumer, holds the application logic,
see more heredeserializer
(optional, default is json deserialization)
function to be used to deserializer messagesoptions
queue-name
queue for consuming messagesexchange-name
exchange which queue will be bind to using queue-name as routing-key
(exchange needs to be already created, you can use default RabbitMQ exchange '')max-retries
(optional, default 3) how many times should be message retriedtimeout-seconds
(optional, default 60s) timeout for message-handler-fnbackoff-interval-seconds
(optional, default 60s) how long should message wait on retry queueconsumer-threads
(optional, default 4) how many consumers should be created, allows parallel processingprefetch-count
(optional, default 10) how many messages should be prefetched by each consumerhandler-fn takes 4 arguments
body
raw message dataparsed
parsed message (most likely you just need to used this data in your handler fn)envelope
message envelopecomponents
components which are specified as component dependencies for consumerhandler-fn is required to return one of following values
:ack
message was processed successfully:retry
recoverable failure => retry message:error
hard failure => no retry(defn handler-fn [body parsed envelope components]
;; ... some domain specific code ...
;; return supported response value
:ack)
(require '[bunnicula.component.connection :as connection]
'[bunnicula.component.monitoring :as monitoring]
'[bunnicula.component.consumer-with-retry :as consumer]
'[com.stuartsierra.component :as component])
(defn import-conversation-handler
[body parsed envelope components]
(let [{:keys [integration_id message_id]} parsed]
;; ... import intercom conversation for given integration_id & message_id ...
;; need to return :ack, :error, :retry
:ack))
(def connection (connection/create {:url "amqp://rabbit:password@127.0.0.1:5672"
:vhost "/main"}))
(def consumer (consumer/create {:message-handler-fn import-conversation-handler
:options {:queue-name "some.queue"
:exchange-name "my-exchange"
:timeout-seconds 120
:backoff-interval-seconds 60
:consumer-threads 4
:max-retries 3}}))
(def system (-> (component/system-map
:rmq-connection connection
:monitoring monitoring/BaseMonitoring
:consumer (component/using
consumer
[:rmq-connection :monitoring]))
component/start-system))
Monitoring component is a required dependency for the consumer component (it has to be present under the monitoring key in the system map.)
Bunnicula provides a basic monitoring component. If you require more advanced monitoring functionality you can also implement your own.
The component needs to implement all methods from Monitoring protocol and support component lifecycle
You can also use bunnicula.monitoring component, which will track consumer metrics and send those to StatsD and report exceptions to Rollbar.
Provides basic monitoring functionality for consumer component
It logs the result of consumer's message-handler-fn
using clojure.tools.logging
.
You can completely override metrics and error reporting backends and call their APIs directly:
(ns bunnicula.monitoring.custom
(:require [com.stuartsierra.component :as component]
[clojure.tools.logging :as log]
[bunnicula.protocol :as protocol]
[xyz.component.raygun :as raygun]
[xyz.component.graphite :as graphite]))
(defrecord CustomMonitoring [consumer-name raygun graphite]
component/Lifecycle
(start [c]
(log/infof "start consumer-name=%s" consumer-name)
c)
(stop [c]
(log/infof "stop consumer-name=%s" consumer-name)
c)
protocol/Monitoring
(on-success [this args]
(log/infof "consumer=%s success" consumer-name)
(graphite/count graphite :success consumer-name))
(on-error [this args]
(log/errorf "consumer=%s error payload=%s"
consumer-name (log-fn (:message args)))
(graphite/count graphite :error consumer-name))
(on-timeout [this args]
(log/errorf "consumer=%s timeout payload=%s"
consumer-name (log-fn (:message args)))
(graphite/count graphite :timeout consumer-name))
(on-retry [this args]
(log/errorf "consumer=%s retry-attempts=%d payload=%s"
consumer-name (:retry-attempts args) (log-fn (:message args)))
(graphite/count graphite :retry consumer-name))
(on-exception [this args]
(let [{:keys [exception message]} args]
(log/errorf exception "consumer=%s exception payload=%s"
consumer-name (log-fn message))
(when exception-tracker
(tracker/report raygun exception)))
(graphite/count graphite :fail consumer-name)))
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close