(batch! in out size timeout)
(batch! in out size timeout rf)
(batch! in out size timeout rf init)
(batch! in out size timeout rf init close?)
Takes messages from in
and batch them until reaching size
or
timeout
ms, and puts them to out
.
Batches with reducing function rf
into initial value derived from calling init
.
If init is not supplied rf is called with zero args.
By default will use conj and a vector.
Takes messages from `in` and batch them until reaching `size` or `timeout` ms, and puts them to `out`. Batches with reducing function `rf` into initial value derived from calling `init`. If init is not supplied rf is called with zero args. By default will use conj and a vector.
(batch!! in out size timeout)
(batch!! in out size timeout rf)
(batch!! in out size timeout rf init)
(batch!! in out size timeout rf init close?)
Takes messages from in
and batch them until reaching size
or
timeout
ms, and puts them to out
.
Batches with reducing function rf
into initial value derived from calling init
.
If init is not supplied rf is called with zero args.
By default will use conj and a vector.
Like batch!
but blocking.
Takes messages from `in` and batch them until reaching `size` or `timeout` ms, and puts them to `out`. Batches with reducing function `rf` into initial value derived from calling `init`. If init is not supplied rf is called with zero args. By default will use conj and a vector. Like [[batch!]] but blocking.
(consume! ch v & body)
Takes values repeatedly from ch
as v
and evaluate body
with it.
The opposite of produce.
Stops consuming values when the channel is closed.
Takes values repeatedly from `ch` as `v` and evaluate `body` with it. The opposite of produce. Stops consuming values when the channel is closed.
(consume!! ch v & body)
Take values repeatedly from ch
as v
and evaluate body
with it.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume!
but blocking.
Take values repeatedly from `ch` as `v` and evaluate `body` with it. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume!]] but blocking.
(consume-call! ch f)
Take values repeatedly from ch
and apply f
to them.
The opposite of produce.
Stops consuming values when the channel is closed.
Take values repeatedly from `ch` and apply `f` to them. The opposite of produce. Stops consuming values when the channel is closed.
(consume-call!! ch f)
Take values repeatedly from ch
and applies f
to them.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-call!
but blocking.
Take values repeatedly from `ch` and applies `f` to them. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-call!]] but blocking.
(consume-checked! ch v & body)
Take values repeatedly from ch
as v
and run body
.
The opposite of produce.
Stops consuming values when the channel is closed or body evaluates to a false-y value.
Take values repeatedly from `ch` as `v` and run `body`. The opposite of produce. Stops consuming values when the channel is closed or body evaluates to a false-y value.
(consume-checked!! ch v & body)
Takes values repeatedly from ch
as v
and evaluate body
.
Recurs only when body
evaluates to a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-checked!
but blocking.
Takes values repeatedly from `ch` as `v` and evaluate `body`. Recurs only when `body` evaluates to a non false-y value. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-checked!]] but blocking.
(consume-checked-call! ch f)
Take values repeatedly from ch
and apply f
to them.
Recur only when f
returns a non false-y value.
The opposite of produce.
Stop consuming values when the channel is closed.
Take values repeatedly from `ch` and apply `f` to them. Recur only when `f` returns a non false-y value. The opposite of produce. Stop consuming values when the channel is closed.
(consume-checked-call!! ch f)
Takes values repeatedly from ch
and applies f
to them.
Recurs only when f
returns a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed.
Like consume-checked-call!
but blocking.
Takes values repeatedly from `ch` and applies `f` to them. Recurs only when `f` returns a non false-y value. The opposite of produce. Stops consuming values when the channel is closed. Like [[consume-checked-call!]] but blocking.
(control* f cf ctl)
Wraps f with control function cf such that f will be invoked when: cf returns a truthy value for a value taken from ctl channel or there is no signal on ctl channel to take immidiately
Wraps f with control function cf such that f will be invoked when: cf returns a truthy value for a value taken from ctl channel or there is no signal on ctl channel to take immidiately
(fan! f xf from to)
Partition values from from
by f
and apply xf
to each partition.
Useful for stateful transducer operating on streaming partitioned data
when the partitions aren't know a priori.
Warnings and caveats: creates a new channel and go block per new partition. Very bad for unbounded inputs.
Partition values from `from` by `f` and apply `xf` to each partition. Useful for stateful transducer operating on streaming partitioned data when the partitions aren't know a priori. Warnings and caveats: creates a new channel and go block per new partition. Very bad for unbounded inputs.
(interrupt-control f ctl)
(interrupt-control f cf ctl)
(interrupt-control f cf ctl & es-ehs)
Like control
but only checks ctl channel if f throws.
takes an optional seq of exceptions and exception handlers to
handle different exceptions which can be thrown by f.
Like `control` but only checks ctl channel if f throws. takes an optional seq of exceptions and exception handlers to handle different exceptions which can be thrown by f.
(mux to & from)
Put into to
a map of takes by all pairs in from
from key to
channel.
Put into `to` a map of takes by all pairs in `from` from key to channel.
(ooo-pipeline n to xf from)
(ooo-pipeline n to xf from close?)
(ooo-pipeline n to xf from close? ex-handler)
Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n. Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. Note this should be used for computational parallelism. If you have multiple blocking operations to put in flight, use ooo-pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use ooo-pipeline-async instead.
Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n. Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. Note this should be used for computational parallelism. If you have multiple blocking operations to put in flight, use ooo-pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use ooo-pipeline-async instead.
(ooo-pipeline-async n to af from)
(ooo-pipeline-async n to af from close?)
Takes elements from the from channel and supplies them to the to channel, subject to the async function af, with parallelism n. af must be a function of two arguments, the first an input value and the second a channel on which to place the result(s). af must close! the channel before returning. The presumption is that af will return immediately, having launched some asynchronous operation (i.e. in another thread) whose completion/callback will manipulate the result channel. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. See also ooo-pipeline, ooo-pipeline-blocking.
Takes elements from the from channel and supplies them to the to channel, subject to the async function af, with parallelism n. af must be a function of two arguments, the first an input value and the second a channel on which to place the result(s). af must close! the channel before returning. The presumption is that af will return immediately, having launched some asynchronous operation (i.e. in another thread) whose completion/callback will manipulate the result channel. Outputs will be returned OUT OF ORDER. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. See also ooo-pipeline, ooo-pipeline-blocking.
(ooo-pipeline-blocking n to xf from)
(ooo-pipeline-blocking n to xf from close?)
(ooo-pipeline-blocking n to xf from close? ex-handler)
Like ooo-pipeline, for blocking operations.
Like ooo-pipeline, for blocking operations.
(periodically! f t)
(periodically! f t buf-or-n)
(periodically! f t buf-or-n exh)
Invoke f
periodically with period t
and put the results into
returned channel.
Optionally takes buffer or buffer size and exception handler for f
.
Invoke `f` periodically with period `t` and put the results into returned channel. Optionally takes buffer or buffer size and exception handler for `f`.
(produce! ch & body)
Execute body
repeatedly in a go-loop and put its results into
output ch
.
Execute `body` repeatedly in a go-loop and put its results into output `ch`.
(produce!! ch & body)
Execute body repeatedly in a loop and put its results into output ch
.
Like produce!
but blocking.
Should be called inside a thread or a future.
Execute body repeatedly in a loop and put its results into output `ch`. Like [[produce!]] but blocking. Should be called inside a thread or a future.
(produce-call! ch f)
(produce-call! ch f close?)
Put the contents repeatedly calling f
into the supplied channel.
By default the channel will be closed if f returns nil.
Put the contents repeatedly calling `f` into the supplied channel. By default the channel will be closed if f returns nil.
(produce-call!! ch f)
(produce-call!! ch f close?)
Put the contents of repeatedly calling f
into the supplied channel
ch
.
By default the channel will be closed if f returns nil.
Like produce-call!
but blocking.
Should be called inside a thread or a future.
Put the contents of repeatedly calling `f` into the supplied channel `ch`. By default the channel will be closed if f returns nil. Like [[produce-call!]] but blocking. Should be called inside a thread or a future.
(put-recur! ch & body)
Repeatedly [[a/put!]] into ch
the results of running body
.
All limitations which apply to [[a/put!]] apply here as well.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/put!]] into `ch` the results of running `body`. All limitations which apply to [[a/put!]] apply here as well. Uses the core.async fixed size dispatch thread pool.
(put-recur!* ch f)
Repeatedly [[a/put!]] into ch
the results of invoking f
.
All limitations which apply to [[a/put!]] apply here as well.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/put!]] into `ch` the results of invoking `f`. All limitations which apply to [[a/put!]] apply here as well. Uses the core.async fixed size dispatch thread pool.
(reductions! rf init in out)
(reductions! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(reductions!! rf init in out)
(reductions!! rf init in out close?)
Like core/reductions, but takes elements from in channel and produces them to out channel.
Like core/reductions, but takes elements from in channel and produces them to out channel.
(split! f ch m)
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch.
(get m (f v)) must be non-nil for every v!
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch. (get m (f v)) must be non-nil for every v!
(split?! f ch m)
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch.
If (f v) is not in m, the value is dropped
Takes a channel, function f :: v -> k and a map of keys to channels k -> ch, routing the values v from the input channel to the channel such that (f v) -> ch. If (f v) is not in m, the value is dropped
(take-recur! ch v & body)
Repeatedly [[a/take!]] from ch
as v
and evaluate body
with it.
All limitations which apply to [[a/take!]] apply here as well.
Stops recurring when the channel is closed.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/take!]] from `ch` as `v` and evaluate `body` with it. All limitations which apply to [[a/take!]] apply here as well. Stops recurring when the channel is closed. Uses the core.async fixed size dispatch thread pool.
(take-recur!* ch f)
Repeatedly [[a/take!]] from ch
and apply f
to the consumed value.
All limitations which apply to [[a/take!]] apply here as well.
Stops recurring when the channel is closed.
Uses the core.async fixed size dispatch thread pool.
Repeatedly [[a/take!]] from `ch` and apply `f` to the consumed value. All limitations which apply to [[a/take!]] apply here as well. Stops recurring when the channel is closed. Uses the core.async fixed size dispatch thread pool.
(wait! tasks)
Wait for tasks
, a collection of channels, to finish in a
non-blocking context.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish in a non-blocking context. Returns nothing meaningful.
(wait!! tasks)
Wait for tasks
, a collection of channels, to finish in a
blocking context.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish in a blocking context. Returns nothing meaningful.
(wait* tasks mode)
Wait for tasks
, a collection of channels, to finish.
Returns nothing meaningful.
Wait for `tasks`, a collection of channels, to finish. Returns nothing meaningful.
(wait-group n & body)
Run body
n
times in [[a/thread]] and wait for all runs to finish.
Returns a promise chan which closes when all tasks finish. May run
cleanup
in the end. Cleanup is delimited from the rest of the body
by the keyword :finally
.
Cleanup is guaranteed to run once.
Example:
(wait-group
8
(let [n (+ 1000 (rand-int 1000))]
(Thread/sleep n)
(println n))
:finally
(println "goodbye!"))
Run `body` `n` times in [[a/thread]] and wait for all runs to finish. Returns a promise chan which closes when all tasks finish. May run `cleanup` in the end. Cleanup is delimited from the rest of the body by the keyword `:finally`. Cleanup is guaranteed to run once. Example: ```clojure (wait-group 8 (let [n (+ 1000 (rand-int 1000))] (Thread/sleep n) (println n)) :finally (println "goodbye!")) ```
(wait-group-call n f)
(wait-group-call n f cleanup)
Run f
n
times in [[a/thread]] and wait for all runs to finish.
Returns a promise chan which closes when all tasks finish.
May run cleanup
in the end. Cleanup is guaranteed to run once.
Run `f` `n` times in [[a/thread]] and wait for all runs to finish. Returns a promise chan which closes when all tasks finish. May run `cleanup` in the end. Cleanup is guaranteed to run once.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close