A core.async analogous implementation of channels that uses promises instead of callbacks for all operations and are intended to be used as-is (using blocking operations) in go-blocks backed by virtual threads.
There are no macro transformations, go blocks are just alias for the
promesa.core/vthread
macro that launches an virtual thread.
This code is based on the same ideas as core.async but the implementation is written from scratch, for make it more simplier (and smaller, because it does not intend to solve all the corner cases that core.async does right now).
This code is implemented in CLJS for make available the channel abstraction to the CLJS, but the main use case for this ns is targeted to the JVM, where you will be able to take advantage of virtual threads and seamless blocking operations on channels.
EXPERIMENTAL API
A core.async analogous implementation of channels that uses promises instead of callbacks for all operations and are intended to be used as-is (using blocking operations) in go-blocks backed by virtual threads. There are no macro transformations, go blocks are just alias for the `promesa.core/vthread` macro that launches an virtual thread. This code is based on the same ideas as core.async but the implementation is written from scratch, for make it more simplier (and smaller, because it does not intend to solve all the corner cases that core.async does right now). This code is implemented in CLJS for make available the channel abstraction to the CLJS, but the main use case for this ns is targeted to the JVM, where you will be able to take advantage of virtual threads and seamless blocking operations on channels. **EXPERIMENTAL API**
(<! port)
(<! port timeout-duration)
(<! port timeout-duration timeout-value)
A blocking version of take!
A blocking version of `take!`
(>! port val)
(>! port val timeout-duration)
(>! port val timeout-duration timeout-value)
A convenience alias for put!
.
A convenience alias for `put!`.
(alts ports & {:as opts})
Completes at most one of several operations on channel. Receives a vector of operations and optional keyword options.
A channel operation is defined as a vector of 2 elements for take, and 3 elements for put. Unless the :priority option is true and if more than one channel operation is ready, a non-deterministic choice will be made.
Returns a promise instance that will be resolved when a single operation is ready to a vector [val channel] where val is return value of the operation and channel identifies the channel where the the operation is succeeded.
Completes at most one of several operations on channel. Receives a vector of operations and optional keyword options. A channel operation is defined as a vector of 2 elements for take, and 3 elements for put. Unless the :priority option is true and if more than one channel operation is ready, a non-deterministic choice will be made. Returns a promise instance that will be resolved when a single operation is ready to a vector [val channel] where val is return value of the operation and channel identifies the channel where the the operation is succeeded.
(alts! ports & {:as opts})
A blocking variant of alts
.
A blocking variant of `alts`.
(chan)
(chan buf)
(chan buf xf)
(chan buf xf exh)
Creates a new channel instance, it optionally accepts buffer, transducer and error handler. If buffer is an integer, it will be used as initial capacity for the expanding buffer.
Creates a new channel instance, it optionally accepts buffer, transducer and error handler. If buffer is an integer, it will be used as initial capacity for the expanding buffer.
(chan? o)
Returns true if o
is instance of Channel or satisfies IChannel protocol.
Returns true if `o` is instance of Channel or satisfies IChannel protocol.
(closed? port)
Returns true if channel is closed.
Returns true if channel is closed.
(dropping-buffer n)
Create a dropping buffer instance.
Create a dropping buffer instance.
(expanding-buffer n)
Create a fixed size (but expanding) buffer instance.
This buffer is used by default if you pass an integer as buffer on channel constructor.
Create a fixed size (but expanding) buffer instance. This buffer is used by default if you pass an integer as buffer on channel constructor.
(fixed-buffer n)
Create a fixed size buffer instance.
Create a fixed size buffer instance.
(go & body)
Schedules the body to be executed asychronously, potentially using virtual thread if available (a normal thread will be used in other case). Returns a promise instance that resolves with the return value when the asynchronous block finishes.
Forwards dynamic bindings.
Schedules the body to be executed asychronously, potentially using virtual thread if available (a normal thread will be used in other case). Returns a promise instance that resolves with the return value when the asynchronous block finishes. Forwards dynamic bindings.
(go-chan & body)
A convencience go macro version that returns a channel instead of a promise instance.
A convencience go macro version that returns a channel instead of a promise instance.
(go-loop bindings & body)
A convencience helper macro that combines go + loop.
A convencience helper macro that combines go + loop.
(mult)
(mult buf)
(mult buf xform)
(mult buf xform exh)
Creates an instance of multiplexer.
A multiplexer instance acts like a write-only channel what enables a
broadcast-like (instead of a queue-like) behavior. Channels
containing copies of this multiplexer can be attached using tap!
and dettached with untap!
.
Each item is forwarded to all attached channels in parallel and synchronously; use buffers to prevent slow taps from holding up the multiplexer.
If there are no taps, all received items will be dropped. Closed channels will be automatically removed from multiplexer.
Creates an instance of multiplexer. A multiplexer instance acts like a write-only channel what enables a broadcast-like (instead of a queue-like) behavior. Channels containing copies of this multiplexer can be attached using `tap!` and dettached with `untap!`. Each item is forwarded to all attached channels in parallel and synchronously; use buffers to prevent slow taps from holding up the multiplexer. If there are no taps, all received items will be dropped. Closed channels will be automatically removed from multiplexer.
(mult* ch)
(mult* ch close?)
Create a multiplexer with an externally provided channel. From now, you can use the external channel or the multiplexer instace to put values in because multiplexer implements the IWriteChannel protocol.
Optionally accepts close?
argument, that determines if the channel will
be closed when close!
is called on multiplexer o not.
Create a multiplexer with an externally provided channel. From now, you can use the external channel or the multiplexer instace to put values in because multiplexer implements the IWriteChannel protocol. Optionally accepts `close?` argument, that determines if the channel will be closed when `close!` is called on multiplexer o not.
(offer! port val)
Puts a val into channel if it's possible to do so immediately.
Returns a resolved promise with true
if the operation
succeeded. Never blocks.
Puts a val into channel if it's possible to do so immediately. Returns a resolved promise with `true` if the operation succeeded. Never blocks.
(once-buffer)
Create a once buffer instance.
Create a once buffer instance.
(onto-chan! ch coll)
(onto-chan! ch coll close?)
Puts the contents of coll into the supplied channel.
By default the channel will be closed after the items are copied,
but can be determined by the close? parameter. Returns a promise
that will be resolved with nil
once the items are copied.
Do not creates vthreads (or threads).
Puts the contents of coll into the supplied channel. By default the channel will be closed after the items are copied, but can be determined by the close? parameter. Returns a promise that will be resolved with `nil` once the items are copied. Do not creates vthreads (or threads).
(pipe from to)
(pipe from to close?)
Takes elements from the from channel and supplies them to the to channel. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes.
Do not creates vthreads (or threads).
Takes elements from the from channel and supplies them to the to channel. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes. Do not creates vthreads (or threads).
(pipeline &
{:keys [typ in out f close? n exh]
:or {typ :thread close? true exh channel/ex-handler}})
Create a processing pipeline with the ability to specify the process
function proc-fn
, the type of concurrency primitive to
use (:thread
or :vthread
) and the parallelism.
The proc-fn
should be a function that takes the value and the
result channel; the result channel should be closed once the
processing unit is finished.
By default the output channel is closed when pipeline is terminated,
but it can be specified by user using the :close?
parameter.
Returns a promise which will be resolved once the pipeline is terminated.
Example:
(def inp (sp/chan)) (def out (sp/chan (map inc)))
(sp/pipeline :typ :vthread :close? true :n 10 :in inp :out out :f proc-fn)
(sp/go (loop [] (when-let [val (sp/<! out)] (prn "RES:" val) (recur))) (prn "RES: END"))
(p/await! (sp/onto-chan! inp ["1" "2" "3" "4"] true))
EXPERIMENTAL: API subject to be changed or removed in future releases.
Create a processing pipeline with the ability to specify the process function `proc-fn`, the type of concurrency primitive to use (`:thread` or `:vthread`) and the parallelism. The `proc-fn` should be a function that takes the value and the result channel; the result channel should be closed once the processing unit is finished. By default the output channel is closed when pipeline is terminated, but it can be specified by user using the `:close?` parameter. Returns a promise which will be resolved once the pipeline is terminated. Example: (def inp (sp/chan)) (def out (sp/chan (map inc))) (sp/pipeline :typ :vthread :close? true :n 10 :in inp :out out :f proc-fn) (sp/go (loop [] (when-let [val (sp/<! out)] (prn "RES:" val) (recur))) (prn "RES: END")) (p/await! (sp/onto-chan! inp ["1" "2" "3" "4"] true)) EXPERIMENTAL: API subject to be changed or removed in future releases.
(poll! port)
Takes a val from port if it's possible to do so
immediatelly. Returns a resolved promise with the value if
succeeded, nil
otherwise.
Takes a val from port if it's possible to do so immediatelly. Returns a resolved promise with the value if succeeded, `nil` otherwise.
(put port val)
(put port val timeout-duration)
(put port val timeout-duration timeout-value)
Schedules a put operation on the channel. Returns a promise instance that will resolve to: false if channel is closed, true if put is succeed. If channel has buffer, it will return immediatelly with resolved promise.
Optionally accepts a timeout-duration and timeout-value. The
timeout-duration
can be a long or Duration instance measured in
milliseconds.
Schedules a put operation on the channel. Returns a promise instance that will resolve to: false if channel is closed, true if put is succeed. If channel has buffer, it will return immediatelly with resolved promise. Optionally accepts a timeout-duration and timeout-value. The `timeout-duration` can be a long or Duration instance measured in milliseconds.
(put! port val)
(put! port val timeout-duration)
(put! port val timeout-duration timeout-value)
A blocking version of put
.
A blocking version of `put`.
(sliding-buffer n)
Create a sliding buffer instance.
Create a sliding buffer instance.
(take port)
(take port timeout-duration)
(take port timeout-duration timeout-value)
Schedules a take operation on the channel. Returns a promise instance that will resolve to: nil if channel is closed, obj if value is found. If channel has non-empty buffer the take operation will succeed immediatelly with resolved promise.
Optionally accepts a timeout-duration and timeout-value. The
timeout-duration
can be a long or Duration instance measured in
milliseconds.
Schedules a take operation on the channel. Returns a promise instance that will resolve to: nil if channel is closed, obj if value is found. If channel has non-empty buffer the take operation will succeed immediatelly with resolved promise. Optionally accepts a timeout-duration and timeout-value. The `timeout-duration` can be a long or Duration instance measured in milliseconds.
(take! port)
(take! port timeout-duration)
(take! port timeout-duration timeout-value)
Blocking version of take
.
Blocking version of `take`.
(tap! mult ch)
(tap! mult ch close?)
Copies the multiplexer source onto the provided channel.
Copies the multiplexer source onto the provided channel.
(timeout ms)
Returns a promise that will be resolved in the specified timeout. The default scheduler will be used.
Returns a promise that will be resolved in the specified timeout. The default scheduler will be used.
(timeout-chan ms)
(timeout-chan scheduler ms)
Returns a channel that will be closed in the specified timeout. The default scheduler will be used. You can provide your own as optional first argument.
Returns a channel that will be closed in the specified timeout. The default scheduler will be used. You can provide your own as optional first argument.
(untap! mult ch)
Disconnects a channel from the multiplexer.
Disconnects a channel from the multiplexer.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close