(init! connection configuration)
Initializes multiple services in one go. Takes a rabbitmq connection and a configuration map.
The configuration map can have the keys event-exchanges
,
incoming-services
, external-services
, incoming-events
, and
outgoing-events
. Each key's value should be a sequence of maps
appropriate for its initializer.
Returns a collection of resources that may be closed by shutdown!
.
Initializes multiple services in one go. Takes a rabbitmq connection and a configuration map. The configuration map can have the keys `event-exchanges`, `incoming-services`, `external-services`, `incoming-events`, and `outgoing-events`. Each key's value should be a sequence of maps appropriate for its initializer. Returns a collection of resources that may be closed by `shutdown!`.
(init-exchange! connection
{:keys [exchange type options]
:or {type "topic" options default-exchange-options}})
Initializes an exchange.
Arguments:
connection
: A rabbitmq connectionexchange-options
: A map defining the exchangeValid keys for the exchange-options
are:
:exchange
: The name of the exchange (required):type
: The type of the exchange (optional, default "topic"
):options
: The options map passed to langohr.exchange/declare
(optional, default {:auto-delete false :durable true})Returns a map with one key, :rabbit-channel
, the value of which is
the channel created as a side-effect of declaring the exchange. The
map may be passed to shutdown-part!
to clean it up.
Example:
(init-exchange! connection {:exchange "events"})
That will create a topic exchange named "events".
Initializes an exchange. Arguments: * `connection`: A rabbitmq connection * `exchange-options`: A map defining the exchange Valid keys for the `exchange-options` are: * `:exchange`: The name of the exchange (required) * `:type`: The type of the exchange (optional, default `"topic"`) * `:options`: The options map passed to `langohr.exchange/declare` (optional, default {:auto-delete false :durable true}) Returns a map with one key, `:rabbit-channel`, the value of which is the channel created as a side-effect of declaring the exchange. The map may be passed to `shutdown-part!` to clean it up. Example: ``` (init-exchange! connection {:exchange "events"}) ``` That will create a topic exchange named "events".
(init-external-service! connection
{:keys [response exchange queue queue-options timeout
channel]
:or {exchange default-exchange
queue-options default-queue-options
timeout default-timeout
response nil}})
Initializes an external service to send requests to via a queue which may reply with results.
Arguments:
connection
: A rabbitmq connectionexternal-service-options
: A map defining the serviceValid keys for the external-service-options
are:
:channel
: A fully qualified symbol for the core.async channel that
requests to the service are placed on. Separately, this channel
should be passed to wire-up/async->fn
or
wire-up/async->fire-and-forget-fn
to complete the setup for
calling the external service. (required):queue
: The name of the queue used by the service. (required):queue-options
: The options map passed to langohr.queue/declare
for the queue. (optional, default {:auto-delete false :durable true
:exclusive false}):response
: How responses are received. Key should be nil
,
:streaming
or true
. nil
means no response will be
received. :streaming
means multiple values may come from the
response channel before it is closed. true
means only one value
will come from the response channel. Should match the value used on
the other side. (optional, default nil
):exchange
: The name of the exchange. (optional, default ""
):timeout
: The number of milliseconds to wait for a response from
the service before giving up. (optional, default 1000)Initializes an external service to send requests to via a queue which may reply with results. Arguments: * `connection`: A rabbitmq connection * `external-service-options`: A map defining the service Valid keys for the `external-service-options` are: * `:channel`: A fully qualified symbol for the core.async channel that requests to the service are placed on. Separately, this channel should be passed to `wire-up/async->fn` or `wire-up/async->fire-and-forget-fn` to complete the setup for calling the external service. (required) * `:queue`: The name of the queue used by the service. (required) * `:queue-options`: The options map passed to `langohr.queue/declare` for the queue. (optional, default {:auto-delete false :durable true :exclusive false}) * `:response`: How responses are received. Key should be `nil`, `:streaming` or `true`. `nil` means no response will be received. `:streaming` means multiple values may come from the response channel before it is closed. `true` means only one value will come from the response channel. Should match the value used on the other side. (optional, default `nil`) * `:exchange`: The name of the exchange. (optional, default `""`) * `:timeout`: The number of milliseconds to wait for a response from the service before giving up. (optional, default 1000)
(init-incoming-event! connection
{:keys [queue queue-options exchange routing-key timeout f
prefetch-limit threads]
:or {exchange default-exchange
prefetch-limit default-prefetch-limit
queue-options default-queue-options
threads wire-up/default-thread-count
timeout default-timeout}})
Initializes a handler for incoming messages from a rabbit topic.
Arguments:
connection
: A rabbitmq connectionincoming-event-options
: A map defining the event handlerValid keys for the incoming-event-options
are:
:f
: A fully qualified symbol for the function that will be called
for every message received for the subscribed topic. It must be a
function of one argument, which will be a serializable Clojure
value. (required):routing-key
: The routing key being listened for. (required):queue
: The name of the queue that will be bound to the
routing-key. (required):queue-options
: The options map passed to langohr.queue/declare
for the queue. (optional, default {:auto-delete false :durable true
:exclusive false}):exchange
: The name of the exchange. (optional, default ""
):prefetch-limit
: The number of messages to prefetch from the
queue. (optional, default 1).:threads
: The number of threads to run, each listening for and
handling events. (optional, default 10):timeout
: The number of milliseconds to wait for a message being
accepted for processing before nacking. (optional, default 1000)Initializes a handler for incoming messages from a rabbit topic. Arguments: * `connection`: A rabbitmq connection * `incoming-event-options`: A map defining the event handler Valid keys for the `incoming-event-options` are: * `:f`: A fully qualified symbol for the function that will be called for every message received for the subscribed topic. It must be a function of one argument, which will be a serializable Clojure value. (required) * `:routing-key`: The routing key being listened for. (required) * `:queue`: The name of the queue that will be bound to the routing-key. (required) * `:queue-options`: The options map passed to `langohr.queue/declare` for the queue. (optional, default {:auto-delete false :durable true :exclusive false}) * `:exchange`: The name of the exchange. (optional, default `""`) * `:prefetch-limit`: The number of messages to prefetch from the queue. (optional, default 1). * `:threads`: The number of threads to run, each listening for and handling events. (optional, default 10) * `:timeout`: The number of milliseconds to wait for a message being accepted for processing before nacking. (optional, default 1000)
(init-incoming-job! connection
{:keys [f queue queue-options prefetch-limit threads]
:or {prefetch-limit default-prefetch-limit
queue-options default-queue-options
threads wire-up/default-thread-count}})
Initializes a service that accepts messages on a queue and will send results back asynchronously.
Arguments:
connection
: A rabbitmq connectionincoming-job-options
: A map defining the serviceValid keys for the outgoing-event-options
are:
:f
: A fully qualified symbol for the function that will be
called for every message received for the subscribed topic. It must
be a function of three arguments, which will be a core.async a
channel to put results on, the routing-key for the job that may be
passed to further job services, and a serializable Clojure
value. The function must close the core.async channel when
finished. (required):queue
: The name of the queue that will be bound to the
routing-key. (required):queue-options
: The options map passed to langohr.queue/declare
for the queue. (optional, default {:auto-delete false :durable true
:exclusive false}):prefetch-limit
: The number of messages to prefetch from the
queue. (optional, default 1).:threads
: The number of threads to run, each listening for and
handling events. (optional, default 10)Initializes a service that accepts messages on a queue and will send results back asynchronously. Arguments: * `connection`: A rabbitmq connection * `incoming-job-options`: A map defining the service Valid keys for the `outgoing-event-options` are: * `:f`: A fully qualified symbol for the function that will be called for every message received for the subscribed topic. It must be a function of three arguments, which will be a core.async a channel to put results on, the routing-key for the job that may be passed to further job services, and a serializable Clojure value. The function must close the core.async channel when finished. (required) * `:queue`: The name of the queue that will be bound to the routing-key. (required) * `:queue-options`: The options map passed to `langohr.queue/declare` for the queue. (optional, default {:auto-delete false :durable true :exclusive false}) * `:prefetch-limit`: The number of messages to prefetch from the queue. (optional, default 1). * `:threads`: The number of threads to run, each listening for and handling events. (optional, default 10)
(init-incoming-service! connection
{:keys [f threshold exchange queue queue-options threads
prefetch-limit response]
:or {exchange default-exchange
response nil
prefetch-limit default-prefetch-limit
queue-options default-queue-options
threads wire-up/default-thread-count
threshold default-threshold}})
Initializes a service that accepts messages on a queue which may reply with results.
Arguments:
connection
: A rabbitmq connectionincoming-service-options
: A map defining the serviceValid keys for the incoming-service-options
are:
:f
: A fully qualified symbol for the function that will be
called for every message received on the queue. It must be a
function of one argument, which will be a serializable Clojure
value. For :streaming
responses, it must return a sequence with
the values to stream, or a core.async channel to stream values
from. When returning a core.async channel, you must close the
channel when there are no more values. For :streaming
or true
responses, values must be serializable Clojure values. (required):queue
: The name of the queue to listen for requests on. (required):queue-options
: The options map passed to langohr.queue/declare
for the queue. (optional, default {:auto-delete false :durable true
:exclusive false}):response
: How to respond. Key should be nil
, :streaming
or
true
. nil
means don't return the result of the function to the
caller. :streaming
means the function will return a sequence or a
core.async channel, the values from which will be returned to the
caller one by one. true
means return the result of the function in
one message back to the caller. (optional, default nil
):exchange
: The name of the exchange. (optional, default ""
):prefetch-limit
: The number of messages to prefetch from the
queue. (optional, default 1).:threads
: The number of threads to run, each listening for and
handling messages from the queue. (optional, default 10):threshold
: For :streaming
responses, the number of values at
which to switch from replying on default response queue to replying
on a bespoke queue for the caller. (optional, default 10)Initializes a service that accepts messages on a queue which may reply with results. Arguments: * `connection`: A rabbitmq connection * `incoming-service-options`: A map defining the service Valid keys for the `incoming-service-options` are: * `:f`: A fully qualified symbol for the function that will be called for every message received on the queue. It must be a function of one argument, which will be a serializable Clojure value. For `:streaming` responses, it must return a sequence with the values to stream, or a core.async channel to stream values from. When returning a core.async channel, you must close the channel when there are no more values. For `:streaming` or `true` responses, values must be serializable Clojure values. (required) * `:queue`: The name of the queue to listen for requests on. (required) * `:queue-options`: The options map passed to `langohr.queue/declare` for the queue. (optional, default {:auto-delete false :durable true :exclusive false}) * `:response`: How to respond. Key should be `nil`, `:streaming` or `true`. `nil` means don't return the result of the function to the caller. `:streaming` means the function will return a sequence or a core.async channel, the values from which will be returned to the caller one by one. `true` means return the result of the function in one message back to the caller. (optional, default `nil`) * `:exchange`: The name of the exchange. (optional, default `""`) * `:prefetch-limit`: The number of messages to prefetch from the queue. (optional, default 1). * `:threads`: The number of threads to run, each listening for and handling messages from the queue. (optional, default 10) * `:threshold`: For `:streaming` responses, the number of values at which to switch from replying on default response queue to replying on a bespoke queue for the caller. (optional, default 10)
(init-outgoing-event! connection
{:keys [exchange routing-key channel]
:or {exchange default-exchange}})
Initializes a core.async channel for sending messages to a topic.
Arguments:
connection
: A rabbitmq connectionoutgoing-event-options
: A map defining the channel and topic.Valid keys for the outgoing-event-options
are:
:routing-key
: The routing key used for outgoing
messages. (required):channel
: A fully qualified symbol for the core.async channel from
which messages will be sent to rabbit. Only serializable values may
be placed on that channel. (required):exchange
: The name of the exchange. (optional, default ""
)Initializes a core.async channel for sending messages to a topic. Arguments: * `connection`: A rabbitmq connection * `outgoing-event-options`: A map defining the channel and topic. Valid keys for the `outgoing-event-options` are: * `:routing-key`: The routing key used for outgoing messages. (required) * `:channel`: A fully qualified symbol for the core.async channel from which messages will be sent to rabbit. Only serializable values may be placed on that channel. (required) * `:exchange`: The name of the exchange. (optional, default `""`)
(init-outgoing-job! connection
{:keys [jobs-chan queue queue-options exchange]
:or {queue-options default-queue-options
exchange default-exchange}})
Initializes a queue for sending job requests on.
Arguments:
connection
: A rabbitmq connectionoutgoing-job-options
: A map describing the job channel and queue.Valid keys for the outgoing-job-options
are:
:jobs-chan
: A fully qualified symbol for the core.async channel
from which messages will be sent to rabbit. Only serializable values
may be placed on that channel. (required):queue
: The name of the queue that will be bound to the
routing-key. (required):queue-options
: The options map passed to langohr.queue/declare
for the queue. (optional, default {:auto-delete false :durable true
:exclusive false}):exchange
: The name of the exchange. (optional, default ""
)Initializes a queue for sending job requests on. Arguments: * `connection`: A rabbitmq connection * `outgoing-job-options`: A map describing the job channel and queue. Valid keys for the `outgoing-job-options` are: * `:jobs-chan`: A fully qualified symbol for the core.async channel from which messages will be sent to rabbit. Only serializable values may be placed on that channel. (required) * `:queue`: The name of the queue that will be bound to the routing-key. (required) * `:queue-options`: The options map passed to `langohr.queue/declare` for the queue. (optional, default {:auto-delete false :durable true :exclusive false}) * `:exchange`: The name of the exchange. (optional, default `""`)
(realize-fn x)
(realize-fn x debug?)
(realize-symbol-or-self x)
(realize-symbol-or-self x resolve?)
Given a symbol, gets what it names. Otherwise, returns the argument.
If resolve?
is true, returns the value in the var; otherwise, returns the
var itself. In practice, returning the var allows handler functions to be
dynamically reloaded, which is useful during development.
((realize-symbol-or-self 'clojure.core/inc) 4) ;;=> 5 ((realize-symbol-or-self inc) 5) ;;=> 6 (realize-symbol-or-self 'clojure.core/inc false) ;;=> #'clojure.core/inc
Given a symbol, gets what it names. Otherwise, returns the argument. If `resolve?` is true, returns the value in the var; otherwise, returns the var itself. In practice, returning the var allows handler functions to be dynamically reloaded, which is useful during development. ((realize-symbol-or-self 'clojure.core/inc) 4) ;;=> 5 ((realize-symbol-or-self inc) 5) ;;=> 6 (realize-symbol-or-self 'clojure.core/inc false) ;;=> #'clojure.core/inc
(require-ns sym)
(shutdown! parts)
Closes rabbit channels and core.async channels created by init!
.
Closes rabbit channels and core.async channels created by `init!`.
(shutdown-part! {:keys [rabbit-channel async-channels]})
Closes rabbit channels and core.async channels created by any of
the init-*!
functions.
Closes rabbit channels and core.async channels created by any of the `init-*!` functions.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close