(publish! broker msg)
Publishes msg
to subscribers, who will be notified asynchronously.
msg
must not be nil
. Returns true
unless broker
is stopped.
Under high load, this will block the caller to respect back-pressure.
As such, this should not be called from (go ...)
blocks.
When blocking is not desired, use core.async/put!
on the channel
returned by publish-chan
, or install a windowed buffer using the
:buf-or-n
option of start
to drop messages under high load.
Publishes `msg` to subscribers, who will be notified asynchronously. `msg` must not be `nil`. Returns `true` unless `broker` is stopped. Under high load, this will block the caller to respect back-pressure. As such, this should not be called from `(go ...)` blocks. When blocking is not desired, use `core.async/put!` on the channel returned by [[publish-chan]], or install a windowed buffer using the `:buf-or-n` option of [[start]] to drop messages under high load.
(publish-chan broker)
Returns the channel used for publishing messages to the broker.
Intended for publishing from a (go ...)
block or advanced usage
such as bulk publishing / piping into the broker.
Closing the returned channel will stop the broker.
Returns the channel used for publishing messages to the broker. Intended for publishing from a `(go ...)` block or advanced usage such as bulk publishing / piping into the broker. Closing the returned channel will stop the broker.
(shutdown! broker)
(shutdown! broker timeout-ms)
Stops the broker and waits for all messages to be processed, or timeout-ms
to pass. Returns true
when successfully stopped, or false
on timeout.
Stops the broker and waits for all messages to be processed, or `timeout-ms` to pass. Returns `true` when successfully stopped, or `false` on timeout.
(start)
(start opts)
Starts a broker.
Supported options:
:topic-fn
- function used to determine the topic of an incoming message
for subscribe
. Defaults to first
, expecting messages to be vectors
[topic payload...]
.:xform
- a transducer to transform/filter messages as they are published.
Should not throw exceptions and must not produce nil
messages.:buf-or-n
- core.async
buffer to use for the publish channel. Defaults
to a large fixed-size buffer (1024
).:buf-fn
- function to create core.async
buffers for subscribing
functions. By default, uses small fixed-size buffers (8
).:error-fn
- when a subscribing function throws an exception, this function
will be called with two arguments: the exception and a map with keys
:broker
, :fn
, and :msg
. With no :error-fn
(default), exceptions are
passed to the current thread's UncaughtExceptionHandler
.:exec
- default subscription execution type -- see subscribe
.Starts a broker. Supported options: * `:topic-fn` - function used to determine the topic of an incoming message for [[subscribe]]. Defaults to `first`, expecting messages to be vectors `[topic payload...]`. * `:xform` - a transducer to transform/filter messages as they are published. Should not throw exceptions and must not produce `nil` messages. * `:buf-or-n` - `core.async` buffer to use for the publish channel. Defaults to a large fixed-size buffer (`1024`). * `:buf-fn` - function to create `core.async` buffers for subscribing functions. By default, uses small fixed-size buffers (`8`). * `:error-fn` - when a subscribing function throws an exception, this function will be called with two arguments: the exception and a map with keys `:broker`, `:fn`, and `:msg`. With no `:error-fn` (default), exceptions are passed to the current thread's `UncaughtExceptionHandler`. * `:exec` - default subscription execution type -- see [[subscribe]].
(stop! broker)
Stops the broker, closing all internal async channels.
After that, the broker will no longer accept messages. Any priorly
published messages will still be delivered. Can be called multiple
times. Returns nil
.
Stops the broker, closing all internal async channels. After that, the broker will no longer accept messages. Any priorly published messages will still be delivered. Can be called multiple times. Returns `nil`.
(stop-chan broker)
Returns a channel that will close when the broker stops and all pending messages are processed by the subscribers. Can be used to block for a graceful shutdown.
Returns a channel that will close when the broker stops and all pending messages are processed by the subscribers. Can be used to block for a graceful shutdown.
(subscribe broker f)
(subscribe broker f opts)
(subscribe broker topic f)
(subscribe broker topic f opts)
Subscribes to messages from the broker.
When a topic
is given, subscribes only to messages of this topic.
topic
can also be a sequence of multiple topics to subscribe to.
Without topic
, subscribes to all messages.
If f
is a channel, it is the caller's responsibility to read messages
asynchronously, and making sure that the JVM process waits for the consumer
process to finish.
Options, only when f
is a function:
:exec
- subscription execution type:
:sequential
(default) - call f
in a separate thread, one message at
a time:go
- call f
in a (go ...)
block, one message at a time:pipeline
, :blocking
- call f
in a core.async
pipeline, with
parallelism according to the :parallel
option:async
- call f
in an async pipeline. f
is expected to start an
asynchronous process and return immediately. It must accept a second
argument: an unary function to signal the event has been handled.:parallel
- how many parallel calls to allow when :type
is a pipeline
(default: 1
):chan
- a custom channel to use for the subscription. Useful to fine
tune the buffer or to use a transducer on the channel.Subscribes to messages from the broker. When a `topic` is given, subscribes only to messages of this topic. `topic` can also be a sequence of multiple topics to subscribe to. Without `topic`, subscribes to all messages. If `f` is a channel, it is the caller's responsibility to read messages asynchronously, and making sure that the JVM process waits for the consumer process to finish. Options, only when `f` is a function: * `:exec` - subscription execution type: * `:sequential` (default) - call `f` in a separate thread, one message at a time * `:go` - call `f` in a `(go ...)` block, one message at a time * `:pipeline`, `:blocking` - call `f` in a `core.async` pipeline, with parallelism according to the `:parallel` option * `:async` - call `f` in an async pipeline. `f` is expected to start an asynchronous process and return immediately. It must accept a second argument: an unary function to signal the event has been handled. * `:parallel` - how many parallel calls to allow when `:type` is a pipeline (default: `1`) * `:chan` - a custom channel to use for the subscription. Useful to fine tune the buffer or to use a transducer on the channel.
(unsubscribe broker f)
(unsubscribe broker topic f)
Unsubscribes a function or channel.
If a topic
is given, unsubscribes only from this topic. Otherwise,
unsubscribes from all topics, i.e. clears all subscriptions from
subscribe
.
If f
is not a subscriber, this is a no-op. Returns nil
.
Unsubscribes a function or channel. If a `topic` is given, unsubscribes only from this topic. Otherwise, unsubscribes from all topics, i.e. clears all subscriptions from [[subscribe]]. If `f` is not a subscriber, this is a no-op. Returns `nil`.
(unsubscribe-all broker)
(unsubscribe-all broker topic)
Unsubscribes all subscribers.
When a topic
is given, only subscribers to the given topic will be
unsubscribed. Returns nil
.
Unsubscribes all subscribers. When a `topic` is given, only subscribers to the given topic will be unsubscribed. Returns `nil`.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close