Liking cljdoc? Tell your friends :D

{:title "Messaging" :sequence 2 :base-ns immutant.messaging :description "Simple creation and usage of distributed queues and topics"}

If you're coming from Immutant 1.x, you may notice that the messaging artifact has been renamed (org.immutant/immutant-messaging is now org.immutant/messaging), and the API has changed a bit. We'll point out the notable API changes as we go.

The API

The messaging API is backed by HornetQ, which is an implementation of JMS. JMS provides two primary destination types: queues and topics. Queues represent point-to-point destinations, and topics publish/subscribe.

To use a destination, we need to get a reference to one via the [[queue]] or [[topic]] functions, depending on the type required. This will create the destination if it does not already exist. This is a bit different than the 1.x API, which provided a single start function for this, and determined the type of destination based on conventions around the provided name. In 2.x, we've removed those naming conventions.

Once we have a reference to a destination, we can operate on it with the following functions:

  • [[publish]] - sends a message to the destination
  • [[receive]] - receives a single message from the destination
  • [[listen]] - registers a function to be called each time a message arrives at the destination

If the destination is a queue, we can do synchronous messaging (request-response):

  • [[respond]] - registers a function that receives each request, and the returned value will be sent back to the requester
  • [[request]] - sends a message to the responder

Finally, to deregister listeners, responders, and destinations, we provide a single [[stop]] function. This is another difference from 1.x - the unlisten and stop functions have been collapsed to stop.

Some Examples

You should follow the instructions in the installation guide to set up a project using Immutant 2.x, and in addition to org.immutant/messaging add [cheshire "5.3.1"] to the project dependencies (we'll be encoding some messages as JSON in our examples below, so we'll go ahead and add cheshire while we're at it). Then, fire up a REPL, and require the immutant.messaging namespace to follow along:

(require '[immutant.messaging :refer :all])

First, let's create a queue:

(queue "my-queue")

That will create the queue in the HornetQ broker for us. We'll need a reference to that queue to operate on it. Let's go ahead and store that reference in a var:

(def q (queue "my-queue"))

We can call queue any number of times - if the queue already exists, we're just grabbing a reference to it.

Now, let's register a listener on our queue. Let's just print every message we get:

(def listener (listen q println))

We can publish to that queue, and see that the listener gets called:

(publish q {:hi :there})

You'll notice that we're publishing a map there - we can publish pretty much any data structure as a message. By default, that message will be encoded using edn. We also support other encodings, namely: :fressian, :json, and :none. We can choose a different encoding by passing an :encoding option to publish:

(publish q {:hi :there} :encoding :json)

If you want to use :fressian or :json, you'll need to add org.clojure/data.fressian or cheshire to your dependencies to enable them, respectively.

We passed our options to publish as keyword arguments, but they can also be passed as a map:

(publish q {:hi :there} {:encoding :json})

This holds true for any of the messaging functions that take options.

We're also passing the destination reference to publish instead of the destination name. That's a departure from 1.x, where you could just pass the destination name. Since we no longer have conventions about how queues and topics should be named, we can no longer determine the type of the destination from the name alone.

We can deregister the listener by either passing it to stop or calling .close on it:

(stop listener)
;; identical to
(.close listener)

Now let's take a look at synchronous messaging. Let's create a new queue for this (you'll want to use a dedicated queue for each responder, and not use this queue for publish/listen) and register a responder that just increments the request:

(def sync-q (queue "sync"))

(def responder (respond sync-q inc))

Then, we make a request, which returns a Future that we can dereference:

@(request sync-q 1)

The responder is just a fancy listener, and can be deregistered the same way as a listener.

Concurrency

Listeners can have multiple threads invoking their handler as messages are consumed. You control how many with the :concurrency option provided by [[listen]] and [[respond]]. By default, it is set to 1 for topics and the number of available processors for queues, but for IO-bound handlers, you may see better performance as you increase the number. It very much depends on what your handler is doing and how many messages it needs to process concurrently.

Durable Topic Subscriptions

Typically, messages published to a topic are only delivered to listeners connected to the topic at that time. But it's possible to [[subscribe]] to a topic with a unique name, so that the broker will accumulate messages for that client when it's disconnected and deliver them in the proper order when the client reconnects.

Use the [[subscribe]] function to create a "durable topic subscriber". Like [[listen]] it expects a callback function. Unlike [[listen]], the destination must be a topic, and a unique subscription-name is required. If the resulting client gets disconnected for any reason, simply call [[subscribe]] again with the same subscription-name and any messages published to the topic in the client's absence will be mapped to the callback function.

Some additional server-side resources are required to track each subscriber, of course, so [[unsubscribe]] is provided to tear down a durable topic subscription when no longer needed.

Contexts

Immutant borrows the Context abstraction introduced in JMS 2.0, which is essentially a mashup of Connection and Session.

Most of the messaging functions accept a :context option. If omitted, one is automatically created on the caller's behalf, used, and then disposed of. This is fine for most use cases, but some will require you to manage the lifecycle of one or more Contexts yourself. Two cases, in particular:

  • Remote destinations, discussed in the next section
  • Publishing or receiving a batch of messages

When publishing a batch of messages, it's more efficient to create a single [[context]] and pass it to each [[publish]] or [[request]] call. Otherwise, a new one is created and torn down for every message in the batch. Of course, you're responsible for closing any Context you create so with-open is your friend:

(with-open [ctx (context)]
  (let [q (queue "foo")]
    (dotimes [n 10000]
      (publish q n :context ctx))))

This is not a problem for [[listen]], [[subscribe]] or [[respond]] since each only requires a single Context no matter how many times their callback function is invoked. It is potentially an issue for [[receive]], but if you're receiving a batch of messages, you should consider using [[listen]] instead.

Remote Destinations

To connect to a remote HornetQ instance, you'll need to create a remote context (via the [[context]] function), and use it when getting a reference to the destination:

(with-open [context (context :host "some-host" :port 5445)]
  (publish
    (queue "foo" :context context)
    "a message"))

A few things to note about the above example:

  • We're using with-open because you need to close any contexts you make.
  • We don't need to pass the context to [[publish]], since it will reuse the context that was used to create the queue reference.

Most importantly, since we're passing the context to [[queue]], a queue with that name must already exist on the remote host. When [[queue]] is passed a remote context, it will return a reference to the remote queue without asking HornetQ to create it.

Destination creation in WildFly

Outside of WildFly, i.e. embedded in your standalone app, creating destinations is as simple as calling either [[queue]] (or [[topic]]) without a remote context.

But creating destinations dynamically like that is not something WildFly is built for. JEE apps expect their queues/topics to already exist; creating them is the job of the server administrator.

Calling [[queue]] or [[topic]] within WildFly might work, but it could also result in deadlock depending on the CPU resources available and the number of apps being deployed simultaneously. You can avoid that uncertainty by configuring the destinations yourself using the normal WildFly mechanisms. You still have to call [[queue]] or [[topic]] in your Clojure code, but because the named destinations already exist, no deadlock will occur.

As this guide explains, there are 3 common ways to create your destinations with WildFly: the main config file, one or more -jms.xml files, or the CLI.

Here's a snippet from the main config file, e.g. standalone-full.xml with comments showing the Clojure code required to reference each one:

...
  <hornetq-server>
    ...
    <jms-destinations>

      <!-- (queue "foo") -->
      <jms-queue name="foo">
        <entry name="java:/jms/queue/foo"/>
      </jms-queue>

      <!-- (topic "/foo/bar") -->
      <jms-topic name="/foo/bar">
        <entry name="java:/jms/topic/_/foo/bar"/>
      </jms-topic>

    </jms-destinations>
  </hornetq-server>
</subsystem>

Note that Immutant looks the destination up via JNDI, so you must have an entry of the same form as those given above. Also note that if the name of the destination starts with /, you must include a _ segment in the JNDI names, as JNDI does not allow blank segments. Immutant will insert the _ when looking up or creating destination entries where the name starts with /.

Context modes

When creating a context, you can pass a :mode option that controls how messages will be acknowledged and delivered.

Immutant provides three modes:

  • :auto-ack - the default for contexts, when this mode is active, receipt of a message is automatically acknowledged when a receive call completes. This mode doesn't affect publication - publish calls will complete immediately.

  • :client-ack - when this mode is active, you are responsible for acknowledging the message manually by calling .acknowledge on the Message object. This means you need to get the raw message (by passing :decode? false to receive). This mode doesn't affect publication - publish calls will complete immediately.

  • :transacted - the default for listeners, when this mode is active, you are responsible for committing or rolling back (by calling .commit or .rollback on the context, respectively) any actions performed on the context. This applies to publishes and receives.

If a context is created with :xa? true, the :mode option is ignored. See the Transactions Guide for more details.

Context modes for listeners

The [[listen]] and [[respond]] functions take a :mode option, which is used instead of the mode of any :context option (listeners need a context per thread, so create and manage their own contexts). The :mode option to these functions takes the same modes as above, with the following behavioral differences (note that :transacted is the default for listeners, not :auto-ack):

  • :auto-ack - when active, the receipt of the message will automatically be acknowledged when the handler function completes.

  • :client-ack - when active, you are responsible for acknowledging the message manually by calling .acknowledge on the Message object. This means you need to get the raw message (by passing :decode? false to listen).

  • :transacted - the default for listeners! when active, .commit is called on the context automatically if the handler function completes successfully. If it throws an exception, .rollback is called on the context. Any messaging operations that take a context will use the context that is active for the listener itself (if not passed one explicitly). This means that any messaging operations within the handler function become participants in the listener's transaction, by default. This can lead to deadlock when a transacted handler calls publish or request since those calls can't complete until the handler completes. In such a case, either set :mode to :auto-ack or use distributed transactions.

If you need to use distributed transactions (XA) within a listener function, you are responsible for demarcating the transaction. See the Transactions Guide for more details.

HornetQ configuration

When used outside of WildFly, we configure HornetQ via a pair of xml files. If you need to adjust any of the HornetQ configuration options, you can provide a copy of one (or both) of those files (hornetq-configuration.xml and hornetq-jms.xml, which should be based off of the default versions) on your application's classpath and your copies will be used instead of the default ones. When making changes to these files, be careful about changing existing settings, as Immutant relies on some of them.

We've also exposed a few HornetQ settings as system properties, namely:

PropertyDescriptionDefault
hornetq.data.dirThe base directory for HornetQ to store its data files./hornetq-data/
hornetq.netty.portThe port that HornetQ will listen on for remote connections5445
hornetq.netty.hostThe host that HornetQ will listen on for remote connectionslocalhost

Note that any custom xml or system properties will be ignored when running inside WildFly - you'll need to make adjustments to the WildFly configuration to achieve similar effects.

In addition, it is possible to override many HornetQ configuration settings at runtime using [[immutant.messaging.hornetq/set-address-options]].

Transactions

When the messaging operations are left to create their own Context, they check to see whether an XA transaction is active. If so, an XA context is created and automatically enlisted as a resource in the active transaction. Otherwise, a more efficient non-XA Context is used.

So you only pay for transactions if you need them.

However, the default value for the [[context]] function's :xa? option is false, so if you're managing Context instances yourself, you must set :xa? to true if you need that Context to be part of a distributed XA transaction.

Listeners

In Immutant 1.x, message listeners were automatically enlisted participants in an XA transaction, but that is not the case with Immutant 2.x. Within the listener function, you must now explicitly define a transaction using one of the macros in [[immutant.transactions]]. If an exception escapes its body, the transaction will be rolled back, and if the exception bubbles out of the listener function, the message will be queued for redelivery.

But the rollback of the transaction has no relationship to message redelivery, which is only triggered by the exception. The transaction could be rolled back as a result of calling [[immutant.transactions/set-rollback-only]], in which case no exception would be thrown. Hence, rollback would occur, but not redelivery.

See the Transactions Guide for more details.

Can you improve this documentation? These fine people already did:
Toby Crawley & Jim Crossley
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close