(consume! queue handler-fn & [opts error-handler])

Consume messages from a queue.

The function has 4 parameters, with the first 2 being mandatory.

  1. The name of the queue.
  2. The function that consumes the message.
  3. An optional options map.
  4. An optional error-handler function.

The options map has the following structure (defaults depicted):

{:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1 :requeue-rejected true}

(consume-and-reply! queue handler-fn & [options])

Consumes messages from a queue and produces a reply in the form of a vector [^Map headers ^String message]. All messages are auto-acked.

The function has 3 parameters, the first 2 being mandatory.

  1. The name of the queue to consume (must be declared before).
  2. The function (fn [^Map headers ^bytes message]) that consumes the message and produces a reply.
  3. An optional options map

The options map has the following structure (defaults depicted): {:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1}

(consume-queued! queue)
(consume-queued! queue timeout)

Consumes all messages from a queue. Should only be used for testing purposes. All the messages are read in memory. Returns an array of messages where each message is a two-element array: [meta body]

(get-rabbit-batch exchange routing-key opts)


(publish! exchange
          [headers {:keys [correlation-id] :as props}])

Publishes a message to an exchange with a routing key. The function has 5 parameters, with the first three being mandatory:

  1. The name of the exchange.
  2. The routing key. Blank string for no routing key.
  3. The message payload as string.
  4. An optional map of message headers.
  5. An optional map of the message properties.

The message properties map has the following structure (defaults depicted):

{:timestamp (java.util.Date.) :persistent? true :content-encoding "UTF-8" :content-length 0 :redelivered? false :priority 0 :app-id nil :cluster-id nil :correlation-id nil :message-id nil :user-id nil :reply-to nil :type nil :expiration nil}

(publish-and-receive! target-queue
                       {:keys [expiration] :or {expiration 1000} :as opts}])

Publish message to target-queue and returns the reply (in the form of a vector: [^Map headers ^String message]. The opts map is the same as in publish! with the following exceptions:

:expiration is set 1000ms by default :persistent? is set to false and cannot be overridden :correlation-id is generated and cannot be overridden

If the message expires or the consumer cannot respond in expiration, an ExceptionInfo is thrown with the following contents:

message: "Timeout when waiting for reply from target-queue". map: {:type :rabid/receive-timeout :queuetarget-queue:timeoutexpiration:payloadpayload}

(publish-async! & args)

Publishes messages asynchronously, i.e. the actual publishing is done in another thread.

The function has the same parameters as the regular publish! function.

(publish-batch! exchange routing-key payload & [headers props])

Publishes messages in batches, i.e. by combining the payloads into a single message as per provided batch properties.

The function has the same parameters as the regular publish! function. Additionally, the message properties map may contain the following batch properties structure (defaults depicted):

{:batch {:batch-size 25 :buffer-limit Integer/MAX_VALUE :timeout 50}}

Destroys the RabbitMQ connection and stops all consumers.

(startup! opts)

Opens a new connection to RabbitMQ, using the given options.

The following depicts the entire opts map with the default values:

{:hosts "localhost" :port 5432 :vhost "/" :user "guest" :password "guest" :publisher-confirms true :publisher-confirm-type :async :publisher-returns false :request-heartbeat 10 :connection-timeout 60 :cache-mode :channel :on-connection-closed spring/default-on-connection-closed :on-connection-opened spring/default-on-connection-opened :publish-retry-policy {:enabled? false :initial-interval 500 :multiplier 2.0 :max-interval 30000 :max-attempts Integer/MAX_VALUE :on-unconfirmed-message spring/default-unconfirmed-message-handler :on-returned-message spring/default-returned-message-handler}}

