(<? ch)
Like <!, but will throw the exception if the value read from ch
is an exception
Like <!, but will throw the exception if the value read from `ch` is an exception
(<?? ch)
Like <!, but will throw the exception if the value read from ch
is an exception
Like <!, but will throw the exception if the value read from `ch` is an exception
(<t! ms ch)
Like <! with a ms
timeout as first argument. Returns ::timeout if ms
milliseconds ellapsed without
success reading from ch
.
Like <! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without success reading from `ch`.
(<t!! ms ch)
Like <!! with a ms
timeout as first argument. Returns ::timeout if ms
milliseconds ellapsed without
success reading from ch
.
Like <!! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without success reading from `ch`.
(>t! ms ch val)
Like >! with a ms
timeout as first argument. Returns ::timeout if ms
milliseconds ellapsed without
success putting into ch
.
Like >! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without success putting into `ch`.
(>t!! ms ch val)
Like >!! with a ms
timeout as first argument. Returns ::timeout if ms
milliseconds ellapsed without
success putting into ch
.
Like >!! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without success putting into `ch`.
(arrange-by keyfn chs out)
(arrange-by keyfn
chs
out
{:keys [close? comparator] :or {close? true comparator compare}})
Starts a process that will read values from all chs
, and puts them in out
in order determined
by comparing (keyfn item) using defualt comparator, or a custom supplied one as opts.comparator
The process waits until having values from all ch's before emitting a value.
(i.e when the value from a chan is emitted, the process waits until the same ch
emits a new value
to compare again with the cached vals).
If multiple items have same result of keyfn
, then the first one that was taken is emitted.
When all chs are closed, the process finishes. opts.close?
can be used to close out
when this
happens.
Example use case: Reorder different historical data sources by :timestamp
Starts a process that will read values from all `chs`, and puts them in `out` in order determined by comparing (keyfn item) using defualt comparator, or a custom supplied one as `opts.comparator` The process waits until having values from all ch's before emitting a value. (i.e when the value from a chan is emitted, the process waits until the same `ch` emits a new value to compare again with the cached vals). If multiple items have same result of `keyfn`, then the first one that was taken is emitted. When all chs are closed, the process finishes. `opts.close?` can be used to close `out` when this happens. Example use case: Reorder different historical data sources by :timestamp
(broadcast val chs)
Puts val
into all chs
in parallel. Returns a ch that will contain a collection of the chs
that were found closed while trying to put val
into them, when all chans have accepted the item.
Easier than creating a mult for just distributing one value.
Puts `val` into all `chs` in parallel. Returns a ch that will contain a collection of the `chs` that were found closed while trying to put `val` into them, when all chans have accepted the item. Easier than creating a mult for just distributing one value.
(bundle in
out
{:keys [init add-item ready? full? close?]
:or {full? (constantly false) close? true}})
Creates a process that adds items taken from input chan in
into a bundle
using opts.add-item
fn.
The initial value of bundle
can be specified with opts.init
.
When the bundle is ready (opts.ready?
fn) and the out
chan can accept new values,
the bundle is put into out
, and resets its value to opts.init
. If the bundle is ready, but
out
can't accept new values, the process keeps taking items from in
and adding them to the bundle,
unless bundle is full (opts.full?
fn), in which case it only tries to put the bundle into out
.
When the in
ch closes, the process will no longer try to take values from it, and instead will
only try to put the current bundle into out
iff it's ready. If not ready, then the process
finishes.
If out
ch is found closed when trying to output the bundle, the process finishes.
ready?
, full?
and add-item
fns are called inside go-loop and should behave accordingly
(no i/o, no expensive computations, etc)
Returns out
.
opts:
init
the initial value of the bundle, will be used as a starting point, and reseted to
this value when it gets sent over out
chan.
add-item
2-arity fn ([bundle new-item]) that returns bundle with the new item added.
When the in
ch closes, add-item will also be called with nil
as new-item, so the user can mark
the bundle as ready, to allow the remaining contents of the bundle to be sent before finishing the process.
ready?
1-arity fn ([bundle]) that returns logical true if the bundle is ready to be sent to out
full?
1-arity fn ([bundle]) that returns logical true if the bundle can't be added new values.
Note that full? is only checked after ready? returns true. Defaults to (constantly false)
,
creating an unbounded bundle.
close?
if true, the out ch will be closed after the process finishes. Defaults to true.
Creates a process that adds items taken from input chan `in` into a `bundle` using `opts.add-item` fn. The initial value of `bundle` can be specified with `opts.init`. When the bundle is ready (`opts.ready?` fn) and the `out` chan can accept new values, the bundle is put into `out`, and resets its value to `opts.init`. If the bundle is ready, but `out` can't accept new values, the process keeps taking items from `in` and adding them to the bundle, unless bundle is full (`opts.full?` fn), in which case it only tries to put the bundle into `out`. When the `in` ch closes, the process will no longer try to take values from it, and instead will only try to put the current bundle into `out` iff it's ready. If not ready, then the process finishes. If `out` ch is found closed when trying to output the bundle, the process finishes. `ready?`, `full?` and `add-item` fns are called inside go-loop and should behave accordingly (no i/o, no expensive computations, etc) Returns `out`. opts: - `init` the initial value of the bundle, will be used as a starting point, and reseted to this value when it gets sent over `out` chan. - `add-item` 2-arity fn ([bundle new-item]) that returns bundle with the new item added. When the `in` ch closes, add-item will also be called with `nil` as new-item, so the user can `mark` the bundle as ready, to allow the remaining contents of the bundle to be sent before finishing the process. - `ready?` 1-arity fn ([bundle]) that returns logical true if the bundle is ready to be sent to `out` - `full?` 1-arity fn ([bundle]) that returns logical true if the bundle can't be added new values. Note that full? is only checked after ready? returns true. Defaults to `(constantly false)`, creating an unbounded bundle. - `close?` if true, the out ch will be closed after the process finishes. Defaults to true.
(consume in f)
(consume in
f
{:keys [thread? ex-handler]
:or {thread? false ex-handler default-ex-handler}})
Creates a process that takes items from in
and executes f
with each item.
If f
returns a ch, the internal process will park/block waiting for the ch to emit/close, ignoring
its value. Returns a chan that will close when in is exhausted.
Accepts an optional 3 argument opts
:
thread?
If the internal process should use a thread. Defaults to false (uses go-loop)ex-handler
Exception handler if f
throws an exception. Defaults to useCreates a process that takes items from `in` and executes `f` with each item. If `f` returns a ch, the internal process will park/block waiting for the ch to emit/close, ignoring its value. Returns a chan that will close when in is exhausted. Accepts an optional 3 argument `opts`: - `thread?` If the internal process should use a thread. Defaults to false (uses go-loop) - `ex-handler` Exception handler if `f` throws an exception. Defaults to use
(consumers n in f opts)
Creates n consumers that execute consume
. opts same as consume. Returns chan that will be closed when all workers finished
Creates n consumers that execute `consume`. opts same as consume. Returns chan that will be closed when all workers finished
(debounce ms in out)
(debounce ms
in
out
{:keys [close? add-item] :or {close? true add-item (fn [_ v] v)}})
Creates a process that will accumulate items read from in
(with opts.add-item
),
and will put them into out
iff ms
milliseconds have ellapsed without any new value from in
.
Whenever in
is closed, the process stops. If opts.close?
is true, when the process finishes
it will close out
ch. If out
is closed from the outside, stops reading from in
.
Returns out
.
opts:
add-item
2-arity fn with current value and new item, returns next value. Defaults to (fn [_ v] v)
(only keeps latest value). After the current accumulated value has been put into out
, next add-item
will have nil
as value. Will not get called when reading nil due to in
being closed.close?
if true, closes out
when the process finishes.Creates a process that will accumulate items read from `in` (with `opts.add-item`), and will put them into `out` iff `ms` milliseconds have ellapsed without any new value from `in`. Whenever `in` is closed, the process stops. If `opts.close?` is true, when the process finishes it will close `out` ch. If `out` is closed from the outside, stops reading from `in`. Returns `out`. opts: - `add-item` 2-arity fn with current value and new item, returns next value. Defaults to `(fn [_ v] v)` (only keeps latest value). After the current accumulated value has been put into `out`, next `add-item` will have `nil` as value. Will not get called when reading nil due to `in` being closed. - `close?` if true, closes `out` when the process finishes.
(fworker in)
(fworker in
{:keys [thread? ex-handler]
:or {thread? false ex-handler default-ex-handler}})
Creates a worker that reads nullary fn's from in
and executes them. if the fn returns a
chan, the worker waits for the ch to emit/close before trying to take more fns from in
.
Creates a go-loop
by default, but can be changed to thread loop
with opts.thread?
.
When closing in
, the process will finish. Returns ch
that closes when the worker has finished.
opts:
thread?
Creates the worker using async/threadex-handler
exception handler to be used if incoming fn
throwsCreates a worker that reads nullary fn's from `in` and executes them. if the fn returns a chan, the worker waits for the ch to emit/close before trying to take more fns from `in`. Creates a `go-loop` by default, but can be changed to `thread loop` with `opts.thread?`. When closing `in`, the process will finish. Returns `ch` that closes when the worker has finished. opts: - `thread?` Creates the worker using async/thread - `ex-handler` exception handler to be used if incoming `fn` throws
(fworkers n in)
(fworkers n in opts)
Creates n
workers that run in parallel. See [[worker]].
Returns a ch
that closes when all workers have finished.
Creates `n` workers that run in parallel. See [[worker]]. Returns a `ch` that closes when all workers have finished.
(interval f ms out)
Creates a process that will put the result of calling f
every ms
milliseconds into out
.
The counter starts again once the value has ben put into the output chan.
Closing the output chan will asynchronously stop the process.
Returns out
.
Creates a process that will put the result of calling `f` every `ms` milliseconds into `out`. The counter starts again once the value has ben put into the output chan. Closing the output chan will asynchronously stop the process. Returns `out`.
(mult ch)
(mult ch {:keys [events-ch finished-close?] :or {finished-close? true}})
Alternative implementation of core.async/mult with extra opts
.
opts:
events-ch
optional ch where to receive events related to the mult. Current events are:
finished-close?
If tapping into a finished mult with close?
true, then it automatically
closes the tap ch. Defaults to true. If set to false, same behaviour as core.async/mult
(doesn't close taps when added after main go-loop has finished).
Alternative implementation of core.async/mult with extra `opts`. opts: - `events-ch` optional ch where to receive events related to the mult. Current events are: - :on-fill Sent when the mult goes from having 0 taps to at least one tap. - :on-empty Sent when the mult goes from having at least one tap to 0 taps. - the ch will be closed when the mult process finishes. An internal sliding-buffer is created that pipes to this channel, meaning that older events might get dropped in favor of later events. - `finished-close?` If tapping into a finished mult with `close?` true, then it automatically closes the tap ch. Defaults to true. If set to false, same behaviour as core.async/mult (doesn't close taps when added after main go-loop has finished).
(pipe-process from to)
(pipe-process from to close?)
Same as core.async/pipe, but returns the internal go-loop ch instead of to
Same as core.async/pipe, but returns the internal go-loop ch instead of `to`
(profile-buf buf pfn)
Wraps buf
into a new buffer that works exactly like buf
(calls protocol fns of wrapped buf),
with the addition of calling pfn
with the remaining number of itmes in the buffer when taking
values out of the buffer.
Depends on internal implementation details of core.async, use at your own risk!
Wraps `buf` into a new buffer that works exactly like `buf` (calls protocol fns of wrapped buf), with the addition of calling `pfn` with the remaining number of itmes in the buffer when taking values out of the buffer. Depends on internal implementation details of core.async, use at your own risk!
(pub ch topic-fn)
(pub ch
topic-fn
{:keys [buf-fn events-ch finished-close?]
:or {buf-fn (constantly nil) finished-close? true}})
Alternative implementation of core.async/pub with extra opts
.
opts:
buf-fn
Like buf-fn
in core.async/pub. Fn accepting topic that returns the buffer to be used
per topic.
events-ch
chan that contains events related to each topic (coming from the internal mult of each
topic). Supported events are:
finished-close?
See finished-close?
in mult
Notes:
Alternative implementation of core.async/pub with extra `opts`. opts: - `buf-fn` Like `buf-fn` in core.async/pub. Fn accepting topic that returns the buffer to be used per topic. - `events-ch` chan that contains events related to each topic (coming from the internal mult of each topic). Supported events are: - [:on-fill <topic>] Sent when topic goes from having no subscribers to at least one subscriber. - [:on-empty <topic>] Sent when topic goes from having subscribers to not having any subscribers. - Will be closed when the pub has finished and all internal topic mults are finished. Each topic uses a sliding-buffer of events, some events might get dropped in favor of a later status of the topic. - `finished-close?` See `finished-close?` in [[mult]] Notes: - Once the ch for a topic is created, it will remain in the pub and not be removed/closed. For this reason, the number of topics should be bounded.
(pub-layer {:keys [attach-fn detach-fn src-fn mux-ch-fn]})
Similar but more powerful than spread-pub
.
Creates a new pub that is not attached to any internal source or channel. attach-fn
and detach-fn
control
how this pub is attached to it's underlying sources (can be other pubs, taps, ...). src-fn
creates the sources
to be attached/detached for each topic (could be a ch or more than one), and mux-ch-fn
returns the 'output' chan
for such topic. In simple cases, mux-ch-fn
can return src
if it's a single channel and no intermediary processes
are needed.
Opts:
src-fn
1-arity fn (topic)
that creates the sources that will be attached/detached for such topicattach-fn
2-arity fn (topic, src)
called when a topic is filled with subscribers to attach to underlying resources.detach-fn
2-arity fn (topic, src)
called when a topic is emptied to detach from underlying resources.mux-ch-fn
2-arity fn (topic, src)
that returns the mux ch to use on the internal mult. Used to setup any
intermediary async pipeline between src
and the returned ch if needed. If no pipeline is needed, and src is a chan,
src can be used as mux-ch.Similar but more powerful than [[spread-pub]]. Creates a new pub that is not attached to any internal source or channel. `attach-fn` and `detach-fn` control how this pub is attached to it's underlying sources (can be other pubs, taps, ...). `src-fn` creates the sources to be attached/detached for each topic (could be a ch or more than one), and `mux-ch-fn` returns the 'output' chan for such topic. In simple cases, `mux-ch-fn` can return `src` if it's a single channel and no intermediary processes are needed. Opts: - `src-fn` 1-arity fn `(topic)` that creates the sources that will be attached/detached for such topic - `attach-fn` 2-arity fn `(topic, src)` called when a topic is filled with subscribers to attach to underlying resources. - `detach-fn` 2-arity fn `(topic, src)` called when a topic is emptied to detach from underlying resources. - `mux-ch-fn` 2-arity fn `(topic, src)` that returns the mux ch to use on the internal mult. Used to setup any intermediary async pipeline between `src` and the returned ch if needed. If no pipeline is needed, and src is a chan, src can be used as mux-ch.
(put-close! ch val)
Shortcut for (do (a/put! ch val) (a/close! ch))
Shortcut for (do (a/put! ch val) (a/close! ch))
(spread-pub p)
(spread-pub p
{:keys [chan-fn finished-close?]
:or {chan-fn (fn [_topic] (a/chan)) finished-close? true}})
DEPRECATED in favor of pub-layer
Creates a new pub wrapping pub p
. You can specify opts.chan-fn
to create a source ch for a given topic,
allowing you to specify a custom xf that will be applied only once for all subscribed chs of a given topic.
Additionally, this internal mult will sub automatically to the underlying p
when there are
any taps on a given topic, and will automatically unsub from p
when there aren't taps on it. This is done
asynchronously, on an internal go-loop that reads from the events emitted by each mult
.
Compatible with both core.async/pub and pub
.
DEPRECATED in favor of [[pub-layer]] Creates a new pub wrapping pub `p`. You can specify `opts.chan-fn` to create a source ch for a given topic, allowing you to specify a custom xf that will be applied only once for all subscribed chs of a given topic. Additionally, this internal mult will sub automatically to the underlying `p` when there are any taps on a given topic, and will automatically unsub from `p` when there aren't taps on it. This is done asynchronously, on an internal go-loop that reads from the events emitted by each [[mult]]. Compatible with both core.async/pub and [[pub]].
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close