A Clojure library designed to pass messages between RabbitMQ and core.async.
Add [democracyworks/kehaar "1.0.0"]
to your dependencies.
There are a few namespaces available for connecting core.async
channels to rabbitmq. kehaar.core
is a low-level
interface. kehaar.wire-up
is a level above. And finally,
kehaar.configured
is above that.
In most cases, the kehaar.configured
namespace,
kehaar.wire-up/async->fn
, and
kehaar.wire-up/async->fire-and-forget-fn
should be all you need to
set up services and events.
See the example project for working examples. See the function docstrings for all available options.
You can use kehaar.configured/init!
to set up all the below examples
in one go by passing a map with the keys :event-exchanges
,
:incoming-services
, :external-services
, :incoming-events
,
:outgoing-events
, :incoming-jobs
, and :outgoing-jobs
each with a
sequence of the maps for each kind of thing. The return value of which
is a collection of all the shutdownable resources created in the
process, and it may be passed to kehaar.configured/shutdown!
to
clean them all up.
Anywhere a kehaar.configured
initialization function expects a
function or channel, you may pass instead a fully qualified symbol
naming the function or channel you wish to use. If you do, the
appropriate namespace will be required and the function or channel
value will be used. This is useful, for example, if you wish to keep
your configuration in an edn file.
All examples are exercised by the example project. See example/README.md for details.
The following assumes that a rabbitmq connection is available as
connection
.
Some typical patterns:
You'll need to declare it.
(kehaar.configured/init-exchange!
connection
{:exchange "events"})
(kehaar.configured/init-external-service!
connection
{:queue "service-works.service.process"
:channel process-channel})
Then you can create a function that "calls" that service, like so:
(def process (wire-up/async->fire-and-forget-fn process-channel))
The returned function takes a single argument, which must be some
edenizable value (including nil
) which will be passed to the
external service via the configured queue. The returned function's
return value is unspecified and should not be used.
Some notes:
:response
to true
in the options map
and use wire-up/async->fn
instead, which will return a core.async
channel that will eventually contain the result.:response
to :streaming
and use wire-up/async->fn
.(kehaar.configured/init-incoming-service!
connection
{:queue "service-works.service.process"
:f handler-function})
Some notes:
handler-function
should be a function of a single argument. It
accepts messages from the queue, which had been serialized and
deserialized as EDN. So expect any kind of serializable value,
including nil
.:response
is :streaming
, then handler-function
should
return a sequence, which probably should be lazy, or a core.async
channel, which must be closed when there are no more values. Each
value in the sequence or on the channel will be returned to the
client in order.:response
is nil
, kehaar will not log error messages when the
incoming messages lack a reply-to queue. This can reduce log noise
if :response
is set to nil
on the request side.(kehaar.configured/init-outgoing-job!
connection
{:jobs-chan job-request-channel
:queue "service.works.service.perform-job"})
Then you can create a function that "calls" that job service, like so:
(def kick-off-job (jobs/async->job job-request-channel))
The returned function takes two arguments, the message to send to the job service, which must be some ednizable value, and a function to handle results from the service(s) producing them for the job. The returned function's return value is the routing key created for the job, but you are unlikely to need it.
(kehaar.configured/init-incoming-job!
connection
{:queue "service.works.service.perform-job"
:f handler-function})
Some notes:
(kehaar.configured/init-outgoing-job!
connection
{:jobs-chan subcontract-channel
:queue "service.works.service.subcontract-part-of-job"})
Then you create a function that calls that job service as a subcontractor, like so:
(def subcontract-job (jobs/async->subjob subcontract-channel))
The returned function takes two arguments: the routing key for the current job, and the message to send.
(kehaar.configured/init-incoming-event!
connection
{:queue "my-service.events.create-something"
:exchange "events"
:routing-key "create-something"
:f handler-function})
Some notes:
handler-function
is a function of exactly one argument, which is
the message that was passed down the rabbit hole. It's serialized on
the way to a ByteString using EDN, so only expect data that can be
EDN-ized.(kehaar.configured/init-outgoing-event!
connection
{:exchange "events"
:routing-key "create-something"
:channel created-event-channel})
The event messages you send on the channel must be EDN-izable.
Each of the kehaar.configured/init-
functions returns a map of
resources that you'll want to clean up. Those maps may be passed to
kehaar.configured/shutdown-part!
to do that.
Kehaar implements backpressure now using RabbitMQ nacks and requeuing. Messages that don't parse will be nacked without requeing.
Here's the thing to know: each wire-up/start-responder!
and
wire-up/start-event-handler!
starts a new thread. When the
in-channel of either of those is full (meaning it takes more than
100ms to add to the core.async channel), the incoming message is
nacked and requeued.
You can start multiple threads with the same handler by calling
wire-up/start-responder!
and wire-up/start-event-handler!
multiple
times.
While it is perfectly acceptable to connect to RabbitMQ using langohr
directly, there is also kehaar.rabbitmq/connect-with-retries
. This
fn takes a RabbitMQ config map just like langohr.core/connect
and,
optionally, a max-retry count (which defaults to 5 if omitted).
It then attempts to connect to the RabbitMQ broker up to the max-retry
count with backoff of (* attempts 1000)
milliseconds.
If it fails to connect to the broker after hitting the maximum number
of retries, it will re-throw the final java.net.ConnectException
from langohr.core/connect
. If it succeeds, it will return the
connection just like langohr.core/connect
.
Copyright © 2016-2018 Democracy Works, Inc.
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close