clj-nats is a Clojure client library for NATS. It's based on the official Java SDK jnats.
Install:
no.cjohansen/clj-nats {:git/url "https://github.com/cjohansen/clj-nats"
:sha "LATEST_SHA"}
The current jnats version is 2.17.4
.
The API is still under development and is subject to change. Once set, it will remain backwards compatible. An official release is expected in April 2024. Until then, use at risk of breaking changes.
clj-nats aims to be an easy to use Clojure client for NATS. It does this in a few ways.
All functions take native Clojure data as arguments instead of instances of jnats option classes. Almost all functions return Clojure data instead of instances of jnats data classes.
The only exceptions are Java time classes, as they are immutable values, and not
easily represented without their wrappers. Specifically, clj-nats uses
java.time.Duration
(timeouts etc) and java.time.Instant
(timestamps etc).
With java-time-literals even
these will be represented as data literals.
clj-nats automatically encodes and decodes EDN messages when appropriate. No need to convert messages to binary by hand. clj-nats sets a header on your messages to do this safely and transparently.
clj-nats wraps a subset of jnats. jnats offers many ways to solve most tasks.
clj-nats only provides the most flexible approach, leaving you with enough
leverage to cater to your own preferences. As an example, jnats has publish
and publishAsync
, while clj-nats only provides publish
. If you want async,
you can wrap it in a future
, create a virtual thread, or choose any number of
other ways to publish off the main thread.
jnats uses java.time.ZonedDateTime
for all timestamps with a GMT timezone.
Because timestamps are instants, this is an unnecessary detour, so clj-nats
operates strictly with java.time.Instant
, both for inputs and outputs.
clj-nats is organized in a flat structure loosely inspired by the nats
CLI.
The goal is to make it easy to translate CLI examples to clj-nats usage.
Create a connection:
(require '[nats.core :as nats])
(def conn (nats/connect "nats://localhost:4222"))
Publish a message (see below for publishing to streams):
(require '[nats.core :as nats])
(def conn (nats/connect "nats://localhost:4222"))
(nats/publish conn
{:nats.message/subject "chat.general.christian"
:nats.message/data {:message "Hello world!"}
:nats.message/headers {:user "Christian"} ;; Optional
:nats.message/reply-to "chat.general.replies"}) ;; Optional
Subscribing to messages (see below for consuming streams):
(require '[nats.core :as nats])
(def conn (nats/connect "nats://localhost:4222"))
(def subscription (nats/subscribe conn "chat.>"))
(def msg1 (nats/pull-message subscription 500)) ;; Wait up to 500ms
(def msg2 (nats/pull-message subscription 500)) ;; Wait up to 500ms
(nats/unsubscribe subscription)
Request/response:
(require '[nats.core :as nats])
(def conn (nats/connect "nats://localhost:4222"))
(def subscription (nats/subscribe conn "chat.>"))
(.start
(Thread.
(fn []
(let [msg (nats/pull-message subscription 500)]
(prn 'Request msg)
(nats/publish conn
{:nats.message/subject (:nats.message/reply-to msg)
:nats.message/data {:message "Hi there, fella"}})))))
(def response (nats/request conn
{:nats.message/subject "chat.general"
:nats.message/data {:message "Hello world!"}}))
(prn 'Response @response)
(nats/unsubscribe subscription)
(nats/close conn)
Create a stream:
(require '[nats.core :as nats]
'[nats.stream :as stream])
(def conn (nats/connect "nats://localhost:4222"))
(stream/create-stream conn
{:nats.stream/name "test-stream"
:nats.stream/subjects ["test.work.>"]
:nats.stream/allow-direct? true
:nats.stream/retention-policy :nats.retention-policy/work-queue})
Publish to a stream subject:
(require '[nats.core :as nats]
'[nats.stream :as stream])
(def conn (nats/connect "nats://localhost:4222"))
(stream/publish conn
{:nats.message/subject "test.work.email.ed281046-938e-4096-8901-8bd6be6869ed"
:nats.message/data {:email/to "christian@cjohansen.no"
:email/subject "Hello, world!"}})
Create a consumer:
(require '[nats.core :as nats]
'[nats.consumer :as consumer])
(def conn (nats/connect "nats://localhost:4222"))
(consumer/create-consumer conn
{:nats.consumer/stream-name "test-stream"
:nats.consumer/name "test-consumer"
:nats.consumer/durable? true
:nats.consumer/filter-subject "test.work.>"})
;; Review its configuration
(consumer/get-consumer-info conn "test-stream" "test-consumer")
Consume messages:
(require '[nats.core :as nats]
'[nats.consumer :as consumer])
(def conn (nats/connect "nats://localhost:4222"))
(with-open [subscription (consumer/subscribe conn "test-stream" "test-consumer")]
(let [message (consumer/pull-message subscription 1000)] ;; Wait for up to 1000ms
(consumer/ack conn message)
(prn message)))
Review stream configuration and state:
(require '[nats.stream :as stream])
(stream/get-stream-config conn "test-stream")
(stream/get-stream-info conn "test-stream")
(stream/get-stream-state conn "test-stream")
(stream/get-mirror-info conn "test-stream")
(stream/get-consumers conn "test-stream")
Peek at some messages from the stream:
(stream/get-first-message conn "test-stream" "test.work.email.*")
(stream/get-last-message conn "test-stream" "test.work.email.*")
(stream/get-message conn "test-stream" 3)
(stream/get-next-message conn "test-stream" 2 "test.work.email.*")
Get information from the server:
(require '[nats.stream :as stream])
(stream/get-streams conn)
(stream/get-account-statistics conn)
To run the tests, you must run a NATS server on port 4222. The tests will publish messages, create and delete streams and consumers, etc. Tests clean up after themselves.
make test
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close