Liking cljdoc? Tell your friends :D

bortexz.utils.async


<?cljmacro

(<? ch)

Like <!, but will throw the exception if the value read from ch is an exception

Like <!, but will throw the exception if the value read from `ch` is an exception
sourceraw docstring

<??clj

(<?? ch)

Like <!, but will throw the exception if the value read from ch is an exception

Like <!, but will throw the exception if the value read from `ch` is an exception
sourceraw docstring

<t!cljmacro

(<t! ms ch)

Like <! with a ms timeout as first argument. Returns ::timeout if ms milliseconds ellapsed without success reading from ch.

Like <! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without
success reading from `ch`.
sourceraw docstring

<t!!clj

(<t!! ms ch)

Like <!! with a ms timeout as first argument. Returns ::timeout if ms milliseconds ellapsed without success reading from ch.

Like <!! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without
success reading from `ch`.
sourceraw docstring

>t!cljmacro

(>t! ms ch val)

Like >! with a ms timeout as first argument. Returns ::timeout if ms milliseconds ellapsed without success putting into ch.

Like >! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without
success putting into `ch`.
sourceraw docstring

>t!!clj

(>t!! ms ch val)

Like >!! with a ms timeout as first argument. Returns ::timeout if ms milliseconds ellapsed without success putting into ch.

Like >!! with a `ms` timeout as first argument. Returns ::timeout if `ms` milliseconds ellapsed without
success putting into `ch`.
sourceraw docstring

arrange-byclj

(arrange-by keyfn chs out)
(arrange-by keyfn
            chs
            out
            {:keys [close? comparator] :or {close? true comparator compare}})

Starts a process that will read values from all chs, and puts them in out in order determined by comparing (keyfn item) using defualt comparator, or a custom supplied one as opts.comparator

The process waits until having values from all ch's before emitting a value. (i.e when the value from a chan is emitted, the process waits until the same ch emits a new value to compare again with the cached vals).

If multiple items have same result of keyfn, then the first one that was taken is emitted.

When all chs are closed, the process finishes. opts.close? can be used to close out when this happens.

Example use case: Reorder different historical data sources by :timestamp

Starts a process that will read values from all `chs`, and puts them in `out` in order determined
by comparing (keyfn item) using defualt comparator, or a custom supplied one as `opts.comparator`

The process waits until having values from all ch's before emitting a value.
(i.e when the value from a chan is emitted, the process waits until the same `ch` emits a new value 
to compare again with the cached vals).

If multiple items have same result of `keyfn`, then the first one that was taken is emitted.

When all chs are closed, the process finishes. `opts.close?` can be used to close `out` when this
happens.

Example use case: Reorder different historical data sources by :timestamp
sourceraw docstring

broadcastclj

(broadcast val chs)

Puts val into all chs in parallel. Returns a ch that will contain a collection of the chs that were found closed while trying to put val into them, when all chans have accepted the item.

Easier than creating a mult for just distributing one value.

Puts `val` into all `chs` in parallel. Returns a ch that will contain a collection of the `chs`
that were found closed while trying to put `val` into them, when all chans have accepted the item.

Easier than creating a mult for just distributing one value.
sourceraw docstring

bundleclj

(bundle in
        out
        {:keys [init add-item ready? full? close?]
         :or {full? (constantly false) close? true}})

Creates a process that adds items taken from input chan in into a bundle using opts.add-item fn. The initial value of bundle can be specified with opts.init.

When the bundle is ready (opts.ready? fn) and the out chan can accept new values, the bundle is put into out, and resets its value to opts.init. If the bundle is ready, but out can't accept new values, the process keeps taking items from in and adding them to the bundle, unless bundle is full (opts.full? fn), in which case it only tries to put the bundle into out.

When the in ch closes, the process will no longer try to take values from it, and instead will only try to put the current bundle into out iff it's ready. If not ready, then the process finishes.

If out ch is found closed when trying to output the bundle, the process finishes.

ready?, full? and add-item fns are called inside go-loop and should behave accordingly (no i/o, no expensive computations, etc)

Returns out.

opts:

  • init the initial value of the bundle, will be used as a starting point, and reseted to this value when it gets sent over out chan.

  • add-item 2-arity fn ([bundle new-item]) that returns bundle with the new item added. When the in ch closes, add-item will also be called with nil as new-item, so the user can mark the bundle as ready, to allow the remaining contents of the bundle to be sent before finishing the process.

  • ready? 1-arity fn ([bundle]) that returns logical true if the bundle is ready to be sent to out

  • full? 1-arity fn ([bundle]) that returns logical true if the bundle can't be added new values. Note that full? is only checked after ready? returns true. Defaults to (constantly false), creating an unbounded bundle.

  • close? if true, the out ch will be closed after the process finishes. Defaults to true.

Creates a process that adds items taken from input chan `in` into a `bundle` using `opts.add-item` fn. 
The initial value of `bundle` can be specified with `opts.init`.

When the bundle is ready (`opts.ready?` fn) and the `out` chan can accept new values, 
the bundle is put into `out`, and resets its value to `opts.init`. If the bundle is ready, but 
`out` can't accept new values, the process keeps taking items from `in` and adding them to the bundle, 
unless bundle is full (`opts.full?` fn), in which case it only tries to put the bundle into `out`.

When the `in` ch closes, the process will no longer try to take values from it, and instead will
only try to put the current bundle into `out` iff it's ready. If not ready, then the process
finishes.

If `out` ch is found closed when trying to output the bundle, the process finishes.

`ready?`, `full?` and `add-item` fns are called inside go-loop and should behave accordingly 
(no i/o, no expensive computations, etc)

Returns `out`.

opts:
- `init` the initial value of the bundle, will be used as a starting point, and reseted to
  this value when it gets sent over `out` chan.

- `add-item` 2-arity fn ([bundle new-item]) that returns bundle with the new item added.
  When the `in` ch closes, add-item will also be called with `nil` as new-item, so the user can `mark`
  the bundle as ready, to allow the remaining contents of the bundle to be sent before finishing the process.

- `ready?` 1-arity fn ([bundle]) that returns logical true if the bundle is ready to be sent to `out`

- `full?` 1-arity fn ([bundle]) that returns logical true if the bundle can't be added new values. 
Note that full? is only checked after ready? returns true. Defaults to `(constantly false)`, 
creating an unbounded bundle.

- `close?` if true, the out ch will be closed after the process finishes. Defaults to true.
sourceraw docstring

chan?clj

(chan? x)
source

consumeclj

(consume in f)
(consume in
         f
         {:keys [thread? ex-handler]
          :or {thread? false ex-handler default-ex-handler}})

Creates a process that takes items from in and executes f with each item. If f returns a ch, the internal process will park/block waiting for the ch to emit/close, ignoring its value. Returns a chan that will close when in is exhausted.

Accepts an optional 3 argument opts:

  • thread? If the internal process should use a thread. Defaults to false (uses go-loop)
  • ex-handler Exception handler if f throws an exception. Defaults to use
Creates a process that takes items from `in` and executes `f` with each item.
If `f` returns a ch, the internal process will park/block waiting for the ch to emit/close, ignoring
its value. Returns a chan that will close when in is exhausted.

Accepts an optional 3 argument `opts`:
- `thread?` If the internal process should use a thread. Defaults to false (uses go-loop)
- `ex-handler` Exception handler if `f` throws an exception. Defaults to use 
sourceraw docstring

consumersclj

(consumers n in f opts)

Creates n consumers that execute consume. opts same as consume. Returns chan that will be closed when all workers finished

Creates n consumers that execute `consume`. opts same as consume. Returns chan that will be closed when all workers finished
sourceraw docstring

debounceclj

(debounce ms in out)
(debounce ms
          in
          out
          {:keys [close? add-item] :or {close? true add-item (fn [_ v] v)}})

Creates a process that will accumulate items read from in (with opts.add-item), and will put them into out iff ms milliseconds have ellapsed without any new value from in.

Whenever in is closed, the process stops. If opts.close? is true, when the process finishes it will close out ch. If out is closed from the outside, stops reading from in.

Returns out.

opts:

  • add-item 2-arity fn with current value and new item, returns next value. Defaults to (fn [_ v] v) (only keeps latest value). After the current accumulated value has been put into out, next add-item will have nil as value. Will not get called when reading nil due to in being closed.
  • close? if true, closes out when the process finishes.
Creates a process that will accumulate items read from `in` (with `opts.add-item`),
and will put them into `out` iff `ms` milliseconds have ellapsed without any new value from `in`.

Whenever `in` is closed, the process stops. If `opts.close?` is true, when the process finishes 
it will close `out` ch. If `out` is closed from the outside, stops reading from `in`.

Returns `out`.

opts:
- `add-item` 2-arity fn with current value and new item, returns next value. Defaults to `(fn [_ v] v)`
  (only keeps latest value). After the current accumulated value has been put into `out`, next `add-item`
  will have `nil` as value. Will not get called when reading nil due to `in` being closed.
- `close?` if true, closes `out` when the process finishes.
sourceraw docstring

fworkerclj

(fworker in)
(fworker in
         {:keys [thread? ex-handler]
          :or {thread? false ex-handler default-ex-handler}})

Creates a worker that reads nullary fn's from in and executes them. if the fn returns a chan, the worker waits for the ch to emit/close before trying to take more fns from in.

Creates a go-loop by default, but can be changed to thread loop with opts.thread?.

When closing in, the process will finish. Returns ch that closes when the worker has finished.

opts:

  • thread? Creates the worker using async/thread
  • ex-handler exception handler to be used if incoming fn throws
Creates a worker that reads nullary fn's from `in` and executes them. if the fn returns a
chan, the worker waits for the ch to emit/close before trying to take more fns from `in`.

Creates a `go-loop` by default, but can be changed to `thread loop` with `opts.thread?`.

When closing `in`, the process will finish. Returns `ch` that closes when the worker has finished.

opts:
- `thread?` Creates the worker using async/thread
- `ex-handler` exception handler to be used if incoming `fn` throws
sourceraw docstring

fworkersclj

(fworkers n in)
(fworkers n in opts)

Creates n workers that run in parallel. See [[worker]]. Returns a ch that closes when all workers have finished.

Creates `n` workers that run in parallel. See [[worker]]. 
Returns a `ch` that closes when all workers have finished.
sourceraw docstring

intervalclj

(interval f ms out)

Creates a process that will put the result of calling f every ms milliseconds into out. The counter starts again once the value has ben put into the output chan. Closing the output chan will asynchronously stop the process. Returns out.

Creates a process that will put the result of calling `f` every `ms` milliseconds into `out`.
The counter starts again once the value has ben put into the output chan.
Closing the output chan will asynchronously stop the process.
Returns `out`.
sourceraw docstring

multclj

(mult ch)
(mult ch {:keys [events-ch finished-close?] :or {finished-close? true}})

Alternative implementation of core.async/mult with extra opts.

opts:

  • events-ch optional ch where to receive events related to the mult. Current events are:

    • :on-fill Sent when the mult goes from having 0 taps to at least one tap.
    • :on-empty Sent when the mult goes from having at least one tap to 0 taps.
    • the ch will be closed when the mult process finishes. An internal sliding-buffer is created that pipes to this channel, meaning that older events might get dropped in favor of later events.
  • finished-close? If tapping into a finished mult with close? true, then it automatically closes the tap ch. Defaults to true. If set to false, same behaviour as core.async/mult (doesn't close taps when added after main go-loop has finished).

Alternative implementation of core.async/mult with extra `opts`.

opts:
- `events-ch` optional ch where to receive events related to the mult. Current events are:
  - :on-fill Sent when the mult goes from having 0 taps to at least one tap.
  - :on-empty Sent when the mult goes from having at least one tap to 0 taps.
  - the ch will be closed when the mult process finishes.
  An internal sliding-buffer is created that pipes to this channel, meaning that older events might get dropped in
  favor of later events.

- `finished-close?` If tapping into a finished mult with `close?` true, then it automatically
  closes the tap ch. Defaults to true. If set to false, same behaviour as core.async/mult
  (doesn't close taps when added after main go-loop has finished).
sourceraw docstring

pipe-processclj

(pipe-process from to)
(pipe-process from to close?)

Same as core.async/pipe, but returns the internal go-loop ch instead of to

Same as core.async/pipe, but returns the internal go-loop ch instead of `to`
sourceraw docstring

profile-bufclj

(profile-buf buf pfn)

Wraps buf into a new buffer that works exactly like buf (calls protocol fns of wrapped buf), with the addition of calling pfn with the remaining number of itmes in the buffer when taking values out of the buffer.

Depends on internal implementation details of core.async, use at your own risk!

Wraps `buf` into a new buffer that works exactly like `buf` (calls protocol fns of wrapped buf), 
with the addition of calling `pfn` with the remaining number of itmes in the buffer when taking
values out of the buffer.

Depends on internal implementation details of core.async, use at your own risk!
sourceraw docstring

pubclj

(pub ch topic-fn)
(pub ch
     topic-fn
     {:keys [buf-fn events-ch finished-close?]
      :or {buf-fn (constantly nil) finished-close? true}})

Alternative implementation of core.async/pub with extra opts.

opts:

  • buf-fn Like buf-fn in core.async/pub. Fn accepting topic that returns the buffer to be used per topic.

  • events-ch chan that contains events related to each topic (coming from the internal mult of each topic). Supported events are:

    • [:on-fill <topic>] Sent when topic goes from having no subscribers to at least one subscriber.
    • [:on-empty <topic>] Sent when topic goes from having subscribers to not having any subscribers.
    • Will be closed when the pub has finished and all internal topic mults are finished. Each topic uses a sliding-buffer of events, some events might get dropped in favor of a later status of the topic.
  • finished-close? See finished-close? in mult

Notes:

  • Once the ch for a topic is created, it will remain in the pub and not be removed/closed. For this reason, the number of topics should be bounded.
Alternative implementation of core.async/pub with extra `opts`.

opts:
- `buf-fn` Like `buf-fn` in core.async/pub. Fn accepting topic that returns the buffer to be used
  per topic.
   
- `events-ch` chan that contains events related to each topic (coming from the internal mult of each
  topic). Supported events are:
  - [:on-fill <topic>] Sent when topic goes from having no subscribers to at least one subscriber.
  - [:on-empty <topic>] Sent when topic goes from having subscribers to not having any subscribers.
  - Will be closed when the pub has finished and all internal topic mults are finished.
  Each topic uses a sliding-buffer of events, some events might get dropped in favor of a later status of the topic.

- `finished-close?` See `finished-close?` in [[mult]]

Notes:
- Once the ch for a topic is created, it will remain in the pub and not be removed/closed. For this
reason, the number of topics should be bounded.
sourceraw docstring

pub-layerclj

(pub-layer {:keys [attach-fn detach-fn src-fn mux-ch-fn]})

Similar but more powerful than spread-pub.

Creates a new pub that is not attached to any internal source or channel. attach-fn and detach-fn control how this pub is attached to it's underlying sources (can be other pubs, taps, ...). src-fn creates the sources to be attached/detached for each topic (could be a ch or more than one), and mux-ch-fn returns the 'output' chan for such topic. In simple cases, mux-ch-fn can return src if it's a single channel and no intermediary processes are needed.

Opts:

  • src-fn 1-arity fn (topic) that creates the sources that will be attached/detached for such topic
  • attach-fn 2-arity fn (topic, src) called when a topic is filled with subscribers to attach to underlying resources.
  • detach-fn 2-arity fn (topic, src) called when a topic is emptied to detach from underlying resources.
  • mux-ch-fn 2-arity fn (topic, src) that returns the mux ch to use on the internal mult. Used to setup any intermediary async pipeline between src and the returned ch if needed. If no pipeline is needed, and src is a chan, src can be used as mux-ch.
Similar but more powerful than [[spread-pub]].

Creates a new pub that is not attached to any internal source or channel. `attach-fn` and `detach-fn` control 
how this pub is attached to it's underlying sources (can be other pubs, taps, ...). `src-fn` creates the sources 
to be attached/detached for each topic (could be a ch or more than one), and `mux-ch-fn` returns the 'output' chan 
for such topic. In simple cases, `mux-ch-fn` can return `src` if it's a single channel and no intermediary processes
are needed.

Opts:
- `src-fn` 1-arity fn `(topic)` that creates the sources that will be attached/detached for such topic
- `attach-fn` 2-arity fn `(topic, src)` called when a topic is filled with subscribers to attach to underlying resources.
- `detach-fn` 2-arity fn `(topic, src)` called when a topic is emptied to detach from underlying resources.
- `mux-ch-fn` 2-arity fn `(topic, src)` that returns the mux ch to use on the internal mult. Used to setup any
  intermediary async pipeline between `src` and the returned ch if needed. If no pipeline is needed, and src is a chan,
  src can be used as mux-ch.
sourceraw docstring

put-close!clj

(put-close! ch val)

Shortcut for (do (a/put! ch val) (a/close! ch))

Shortcut for (do (a/put! ch val) (a/close! ch))
sourceraw docstring

spread-pubcljdeprecated

(spread-pub p)
(spread-pub p
            {:keys [chan-fn finished-close?]
             :or {chan-fn (fn [_topic] (a/chan)) finished-close? true}})

DEPRECATED in favor of pub-layer

Creates a new pub wrapping pub p. You can specify opts.chan-fn to create a source ch for a given topic, allowing you to specify a custom xf that will be applied only once for all subscribed chs of a given topic.

Additionally, this internal mult will sub automatically to the underlying p when there are any taps on a given topic, and will automatically unsub from p when there aren't taps on it. This is done asynchronously, on an internal go-loop that reads from the events emitted by each mult.

Compatible with both core.async/pub and pub.

DEPRECATED in favor of [[pub-layer]]

Creates a new pub wrapping pub `p`. You can specify `opts.chan-fn` to create a source ch for a given topic, 
allowing you to specify a custom xf that will be applied only once for all subscribed chs of a given topic.

Additionally, this internal mult will sub automatically to the underlying `p` when there are 
any taps on a given topic, and will automatically unsub from `p` when there aren't taps on it. This is done
asynchronously, on an internal go-loop that reads from the events emitted by each [[mult]].

Compatible with both core.async/pub and [[pub]].
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close