(batch! in out size timeout)
(batch! in out size timeout rf)
(batch! in out size timeout rf init)
(batch! in out size timeout rf init close?)
(batch! in out size timeout rf init final close?)
Takes messages from in
and batch them until reaching size
or
timeout
ms, and puts them to out
.
Batches with reducing function rf
into initial value derived from calling init
.
If init is not supplied rf is called with zero args.
By default will use conj and a vector.
Optionally takes a final
function (default identity) to call on each batch.
Useful when building up transient collections.
Takes messages from `in` and batch them until reaching `size` or `timeout` ms, and puts them to `out`. Batches with reducing function `rf` into initial value derived from calling `init`. If init is not supplied rf is called with zero args. By default will use conj and a vector. Optionally takes a `final` function (default identity) to call on each batch. Useful when building up transient collections.
(batch!! in out size timeout)
(batch!! in out size timeout rf)
(batch!! in out size timeout rf init)
(batch!! in out size timeout rf init close?)
(batch!! in out size timeout rf init final close?)
Takes messages from in
and batch them until reaching size
or
timeout
ms, and puts them to out
.
Batches with reducing function rf
into initial value derived from calling init
.
If init is not supplied rf is called with zero args.
By default will use conj and a vector.
Optionally takes a final
function (default identity) to call on each batch.
Useful when building up transient collections.
Like batch!
but blocking.
Takes messages from `in` and batch them until reaching `size` or `timeout` ms, and puts them to `out`. Batches with reducing function `rf` into initial value derived from calling `init`. If init is not supplied rf is called with zero args. By default will use conj and a vector. Optionally takes a `final` function (default identity) to call on each batch. Useful when building up transient collections. Like [[batch!]] but blocking.
(break-pipe f from to)
Like [[clojure.core.async/pipe]] but takes a function f
which returns a
timeout channel or nil
.
f
always takes one argument - the item taken from in
, but it can be
ignored.
Like [[clojure.core.async/pipe]] but takes a function `f` which returns a timeout channel or `nil`. `f` always takes one argument - the item taken from `in`, but it can be ignored.
(consume! ch v & body)
Takes values repeatedly from ch
as v
and evaluate body
with it.
The opposite of produce.
Stops consuming values when the channel is closed.
Takes values repeatedly from `ch` as `v` and evaluate `body` with it. The opposite of produce. Stops consuming values when the channel is closed.
(consume!! ch v & body)
Take values repeatedly from ch
as v
and evaluate body
with it.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume!
but blocking.
Take values repeatedly from `ch` as `v` and evaluate `body` with it. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume!]] but blocking.
(consume-call! ch f)
Take values repeatedly from ch
and apply f
to them.
The opposite of produce.
Stops consuming values when the channel is closed.
Take values repeatedly from `ch` and apply `f` to them. The opposite of produce. Stops consuming values when the channel is closed.
(consume-call!! ch f)
Take values repeatedly from ch
and applies f
to them.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-call!
but blocking.
Take values repeatedly from `ch` and applies `f` to them. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-call!]] but blocking.
(consume-checked! ch v & body)
Take values repeatedly from ch
as v
and run body
.
The opposite of produce.
Stops consuming values when the channel is closed or body evaluates to a false-y value.
Take values repeatedly from `ch` as `v` and run `body`. The opposite of produce. Stops consuming values when the channel is closed or body evaluates to a false-y value.
(consume-checked!! ch v & body)
Takes values repeatedly from ch
as v
and evaluate body
.
Recurs only when body
evaluates to a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-checked!
but blocking.
Takes values repeatedly from `ch` as `v` and evaluate `body`. Recurs only when `body` evaluates to a non false-y value. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-checked!]] but blocking.
(consume-checked-call! ch f)
Take values repeatedly from ch
and apply f
to them.
Recur only when f
returns a non false-y value.
The opposite of produce.
Stop consuming values when the channel is closed.
Take values repeatedly from `ch` and apply `f` to them. Recur only when `f` returns a non false-y value. The opposite of produce. Stop consuming values when the channel is closed.
(consume-checked-call!! ch f)
Takes values repeatedly from ch
and applies f
to them.
Recurs only when f
returns a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-checked-call!
but blocking.
Takes values repeatedly from `ch` and applies `f` to them. Recurs only when `f` returns a non false-y value. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-checked-call!]] but blocking.
(control* f cf ctl)
Wraps f with control function cf such that f will be invoked when: cf returns a truthy value for a value taken from ctl channel or there is no signal on ctl channel to take immidiately
Wraps f with control function cf such that f will be invoked when: cf returns a truthy value for a value taken from ctl channel or there is no signal on ctl channel to take immidiately
(delay-pipe size from to timeout-fn)
(delay-pipe size from to timeout-fn close?)
Delay inputs from from
by the value returned by timeout-fn
applied
to each input, and pipe them to to
. Can delay up to size
inputs in
parallel. If the size is greater than 1024 then there are no ordering
guarantees even for constant timeouts.
timeout-fn
must return a number.
Delay inputs from `from` by the value returned by `timeout-fn` applied to each input, and pipe them to `to`. Can delay up to `size` inputs in parallel. If the size is greater than 1024 then there are no ordering guarantees even for constant timeouts. `timeout-fn` must return a number.
(fan! f xf from to)
Partition values from from
by f
and apply xf
to each partition.
Useful for stateful transducer operating on streaming partitioned data
when the partitions aren't know a priori.
Warnings and caveats: creates a new channel and go block per new partition. Very bad for unbounded inputs.
Partition values from `from` by `f` and apply `xf` to each partition. Useful for stateful transducer operating on streaming partitioned data when the partitions aren't know a priori. Warnings and caveats: creates a new channel and go block per new partition. Very bad for unbounded inputs.
(group-by! from to kfn timeout f)
Partition values from from
by kfn
and call f
on an input channel
onto which values will be written and output channel to which results
should be produced.
The values produced into the input channel will be
[(kfn v) v]
.
To handle unbounded inputs, a partition will be removed after
timeout
milliseconds.
Partition values from `from` by `kfn` and call `f` on an input channel onto which values will be written and output channel to which results should be produced. The values produced into the input channel will be `[(kfn v) v]`. To handle unbounded inputs, a partition will be removed after `timeout` milliseconds.
(interrupt-control f ctl)
(interrupt-control f cf ctl)
(interrupt-control f cf ctl & es-ehs)
Like control
but only checks ctl channel if f throws.
takes an optional seq of exceptions and exception handlers to
handle different exceptions which can be thrown by f.
Like `control` but only checks ctl channel if f throws. takes an optional seq of exceptions and exception handlers to handle different exceptions which can be thrown by f.
(merge from to)
(merge from to close?)
Like [[clojure.core.async/merge]] but pipes all channels to to
.
Like [[clojure.core.async/merge]] but pipes all channels to `to`.
(merge! from to)
(merge! from to close?)
Like merge
but less fair and probably faster for small from
counts.
Like [[merge]] but less fair and probably faster for small `from` counts.
(mux to & from)
Put into to
a map of takes by all pairs in from
from key to
channel.
Put into `to` a map of takes by all pairs in `from` from key to channel.
(ooo-pipeline n to xf from)
(ooo-pipeline n to xf from close?)
(ooo-pipeline n to xf from close? ex-handler)
Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n. Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. Note this should be used for computational parallelism. If you have multiple blocking operations to put in flight, use ooo-pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use ooo-pipeline-async instead.
Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n. Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. Note this should be used for computational parallelism. If you have multiple blocking operations to put in flight, use ooo-pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use ooo-pipeline-async instead.
(ooo-pipeline-async n to af from)
(ooo-pipeline-async n to af from close?)
Takes elements from the from channel and supplies them to the to channel, subject to the async function af, with parallelism n. af must be a function of two arguments, the first an input value and the second a channel on which to place the result(s). af must close! the channel before returning. The presumption is that af will return immediately, having launched some asynchronous operation (i.e. in another thread) whose completion/callback will manipulate the result channel. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. See also ooo-pipeline, ooo-pipeline-blocking.
Takes elements from the from channel and supplies them to the to channel, subject to the async function af, with parallelism n. af must be a function of two arguments, the first an input value and the second a channel on which to place the result(s). af must close! the channel before returning. The presumption is that af will return immediately, having launched some asynchronous operation (i.e. in another thread) whose completion/callback will manipulate the result channel. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. See also ooo-pipeline, ooo-pipeline-blocking.
(ooo-pipeline-blocking n to xf from)
(ooo-pipeline-blocking n to xf from close?)
(ooo-pipeline-blocking n to xf from close? ex-handler)
Like ooo-pipeline, for blocking operations.
Like ooo-pipeline, for blocking operations.
(periodically! f t)
(periodically! f t buf-or-n)
(periodically! f t buf-or-n exh)
Invoke f
periodically with period t
and put the results into
returned channel.
Optionally takes buffer or buffer size and exception handler for f
.
Invoke `f` periodically with period `t` and put the results into returned channel. Optionally takes buffer or buffer size and exception handler for `f`.
(produce! ch & body)
Execute body
repeatedly in a go-loop and put its results into
output ch
.
Execute `body` repeatedly in a go-loop and put its results into output `ch`.
(produce!! ch & body)
Execute body repeatedly in a loop and put its results into output ch
.
Like produce!
but blocking.
Should be called inside a thread or a future.
Execute body repeatedly in a loop and put its results into output `ch`. Like [[produce!]] but blocking. Should be called inside a thread or a future.
(produce-call! ch f)
(produce-call! ch f close?)
Put the contents repeatedly calling f
into the supplied channel.
By default the channel will be closed if f returns nil.
Put the contents repeatedly calling `f` into the supplied channel. By default the channel will be closed if f returns nil.
(produce-call!! ch f)
(produce-call!! ch f close?)
Put the contents of repeatedly calling f
into the supplied channel
ch
.
By default the channel will be closed if f returns nil.
Like produce-call!
but blocking.
Should be called inside a thread or a future.
Put the contents of repeatedly calling `f` into the supplied channel `ch`. By default the channel will be closed if f returns nil. Like [[produce-call!]] but blocking. Should be called inside a thread or a future.
(put-recur! ch & body)
Repeatedly [[a/put!]] into ch
the results of running body
.
All limitations which apply to [[a/put!]] apply here as well.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/put!]] into `ch` the results of running `body`. All limitations which apply to [[a/put!]] apply here as well. Uses the core.async fixed size dispatch thread pool.
(put-recur!* ch f)
Repeatedly [[a/put!]] into ch
the results of invoking f
.
All limitations which apply to [[a/put!]] apply here as well.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/put!]] into `ch` the results of invoking `f`. All limitations which apply to [[a/put!]] apply here as well. Uses the core.async fixed size dispatch thread pool.
(rate->ms items-per-second)
Convert an items/second rate to millisecond intervals. Rounds the rate down (interval is rounded up).
Convert an items/second rate to millisecond intervals. Rounds the rate *down* (interval is rounded up).
(rate-limit rate from to)
Like [[clojure.core.async/pipe]] but limits the rate of incoming messages.
rate
is in millisecond. To convert frequency to ms use rate->ms
.
Like [[clojure.core.async/pipe]] but limits the rate of incoming messages. `rate` is in millisecond. To convert frequency to ms use [[rate->ms]].
(reductions! rf init in out)
(reductions! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(reductions!! rf init in out)
(reductions!! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(selecting-reductions! rf init in out)
(selecting-reductions! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(selecting-reductions!! rf init in out)
(selecting-reductions!! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(split! f ch m)
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch.
(get m (f v)) must be non-nil for every v!
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch. (get m (f v)) must be non-nil for every v!
(split?! f ch m)
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch.
If (f v) is not in m, the value is dropped
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch. If (f v) is not in m, the value is dropped
(take-recur! ch v & body)
Repeatedly [[a/take!]] from ch
as v
and evaluate body
with it.
All limitations which apply to [[a/take!]] apply here as well.
Stops recurring when the channel is closed.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/take!]] from `ch` as `v` and evaluate `body` with it. All limitations which apply to [[a/take!]] apply here as well. Stops recurring when the channel is closed. Uses the core.async fixed size dispatch thread pool.
(take-recur!* ch f)
Repeatedly [[a/take!]] from ch
and apply f
to the consumed value.
All limitations which apply to [[a/take!]] apply here as well.
Stops recurring when the channel is closed.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/take!]] from `ch` and apply `f` to the consumed value. All limitations which apply to [[a/take!]] apply here as well. Stops recurring when the channel is closed. Uses the core.async fixed size dispatch thread pool.
(wait! tasks)
Wait for tasks
, a collection of channels, to finish in a
non-blocking context.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish in a non-blocking context. Returns nothing meaningful.
(wait!! tasks)
Wait for tasks
, a collection of channels, to finish in a
blocking context.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish in a blocking context. Returns nothing meaningful.
(wait* tasks mode)
Wait for tasks
, a collection of channels, to finish.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish. Returns nothing meaningful.
(wait-group n & body)
Run body
n
times in [[a/thread]] and wait for all runs to finish.
Returns a promise chan which closes when all tasks finish. May run
cleanup
in the end. Cleanup is delimited from the rest of the body
by the keyword :finally
.
Cleanup is guaranteed to run once.
Example:
(wait-group
8
(let [n (+ 1000 (rand-int 1000))]
(Thread/sleep n)
(println n))
:finally
(println "goodbye!"))
Run `body` `n` times in [[a/thread]] and wait for all runs to finish. Returns a promise chan which closes when all tasks finish. May run `cleanup` in the end. Cleanup is delimited from the rest of the body by the keyword `:finally`. Cleanup is guaranteed to run once. Example: ```clojure (wait-group 8 (let [n (+ 1000 (rand-int 1000))] (Thread/sleep n) (println n)) :finally (println "goodbye!")) ```
(wait-group-call n f)
(wait-group-call n f cleanup)
Run f
n
times in [[a/thread]] and wait for all runs to finish.
Returns a promise chan which closes when all tasks finish.
May run cleanup
in the end. Cleanup is guaranteed to run once.
Run `f` `n` times in [[a/thread]] and wait for all runs to finish. Returns a promise chan which closes when all tasks finish. May run `cleanup` in the end. Cleanup is guaranteed to run once.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close