(consume! queue handler-fn & [opts error-handler])
Consume messages from a queue.
The function has 4 parameters, with the first 2 being mandatory.
The options map has the following structure (defaults depicted):
{:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1 :requeue-rejected true}
Consume messages from a queue. The function has 4 parameters, with the first 2 being mandatory. 1. The name of the queue. 2. The function that consumes the message. 3. An optional options map. 4. An optional error-handler function. The options map has the following structure (defaults depicted): {:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1 :requeue-rejected true}
(consume-and-reply! queue handler-fn & [options])
Consumes messages from a queue and produces a reply in the form of a vector [^Map headers ^String message]. All messages are auto-acked.
The function has 3 parameters, the first 2 being mandatory.
The options map has the following structure (defaults depicted): {:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1}
Consumes messages from a queue and produces a reply in the form of a vector [^Map headers ^String message]. All messages are auto-acked. The function has 3 parameters, the first 2 being mandatory. 1. The name of the queue to consume (must be declared before). 2. The function (fn [^Map headers ^bytes message]) that consumes the message and produces a reply. 3. An optional options map The options map has the following structure (defaults depicted): {:prefetch-count 1 :exclusive? false :recovery-interval 60000 :concurrent-consumers 1}
(consume-queued! queue)
(consume-queued! queue timeout)
Consumes all messages from a queue. Should only be used for testing purposes. All the messages are read in memory.
Returns an array of messages where each message is a two-element array: [meta body]
Consumes all messages from a queue. Should only be used for testing purposes. All the messages are read in memory. Returns an array of messages where each message is a two-element array: `[meta body]`
(publish! exchange
routing-key
payload
&
[headers {:keys [correlation-id] :as props}])
Publishes a message to an exchange with a routing key. The function has 5 parameters, with the first three being mandatory:
The message properties map has the following structure (defaults depicted):
{:timestamp (java.util.Date.) :persistent? true :content-encoding "UTF-8" :content-length 0 :redelivered? false :priority 0 :app-id nil :cluster-id nil :correlation-id nil :message-id nil :user-id nil :reply-to nil :type nil :expiration nil}
Publishes a message to an exchange with a routing key. The function has 5 parameters, with the first three being mandatory: 1. The name of the exchange. 2. The routing key. Blank string for no routing key. 3. The message payload as string. 4. An optional map of message headers. 5. An optional map of the message properties. The message properties map has the following structure (defaults depicted): `{:timestamp (java.util.Date.) :persistent? true :content-encoding "UTF-8" :content-length 0 :redelivered? false :priority 0 :app-id nil :cluster-id nil :correlation-id nil :message-id nil :user-id nil :reply-to nil :type nil :expiration nil}`
(publish-and-receive! target-queue
payload
&
[headers
{:keys [expiration] :or {expiration 1000} :as opts}])
Publish message to target-queue
and returns the reply (in the form of a vector: [^Map headers ^String message].
The opts map is the same as in publish!
with the following exceptions:
:expiration is set 1000ms by default
:persistent? is set to false
and cannot be overridden
:correlation-id is generated and cannot be overridden
If the message expires or the consumer cannot respond in expiration
,
an ExceptionInfo is thrown with the following contents:
message: "Timeout when waiting for reply from target-queue
".
map:
{:type :rabid/receive-timeout :queue
target-queue:timeout
expiration:payload
payload}
Publish message to `target-queue` and returns the reply (in the form of a vector: [^Map headers ^String message]. The opts map is the same as in `publish!` with the following exceptions: :expiration is set 1000ms by default :persistent? is set to `false` and cannot be overridden :correlation-id is generated and cannot be overridden If the message expires or the consumer cannot respond in `expiration`, an ExceptionInfo is thrown with the following contents: message: "Timeout when waiting for reply from `target-queue`". map: `{:type :rabid/receive-timeout :queue `target-queue` :timeout `expiration` :payload `payload`}`
(publish-async! & args)
Publishes messages asynchronously, i.e. the actual publishing is done in another thread.
The function has the same parameters as the regular publish!
function.
Publishes messages asynchronously, i.e. the actual publishing is done in another thread. The function has the same parameters as the regular `publish!` function.
(publish-batch! exchange routing-key payload & [headers props])
Publishes messages in batches, i.e. by combining the payloads into a single message as per provided batch properties.
The function has the same parameters as the regular publish!
function.
Additionally, the message properties map may contain the following batch properties structure (defaults depicted):
{:batch {:batch-size 25 :buffer-limit Integer/MAX_VALUE :timeout 50}}
Publishes messages in batches, i.e. by combining the payloads into a single message as per provided batch properties. The function has the same parameters as the regular `publish!` function. Additionally, the message properties map may contain the following batch properties structure (defaults depicted): `{:batch {:batch-size 25 :buffer-limit Integer/MAX_VALUE :timeout 50}}`
(shutdown!)
Destroys the RabbitMQ connection and stops all consumers.
Destroys the RabbitMQ connection and stops all consumers.
(startup! opts)
Opens a new connection to RabbitMQ, using the given options.
The following depicts the entire opts map with the default values:
{:hosts "localhost" :port 5432 :vhost "/" :user "guest" :password "guest" :publisher-confirms true :publisher-confirm-type :async :publisher-returns false :request-heartbeat 10 :connection-timeout 60 :cache-mode :channel :on-connection-closed spring/default-on-connection-closed :on-connection-opened spring/default-on-connection-opened :publish-retry-policy {:enabled? false :initial-interval 500 :multiplier 2.0 :max-interval 30000 :max-attempts Integer/MAX_VALUE :on-unconfirmed-message spring/default-unconfirmed-message-handler :on-returned-message spring/default-returned-message-handler}}
Opens a new connection to RabbitMQ, using the given options. The following depicts the entire opts map with the default values: `{:hosts "localhost" :port 5432 :vhost "/" :user "guest" :password "guest" :publisher-confirms true :publisher-confirm-type :async :publisher-returns false :request-heartbeat 10 :connection-timeout 60 :cache-mode :channel :on-connection-closed spring/default-on-connection-closed :on-connection-opened spring/default-on-connection-opened :publish-retry-policy {:enabled? false :initial-interval 500 :multiplier 2.0 :max-interval 30000 :max-attempts Integer/MAX_VALUE :on-unconfirmed-message spring/default-unconfirmed-message-handler :on-returned-message spring/default-returned-message-handler}}`
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close