Liking cljdoc? Tell your friends :D

promesa.exec.csp

A core.async analogous implementation of channels that uses promises instead of callbacks for all operations and are intended to be used as-is (using blocking operations) in go-blocks backed by virtual threads.

There are no macro transformations, go blocks are just alias for the promesa.core/vthread macro that launches an virtual thread.

This code is based on the same ideas as core.async but the implementation is written from scratch, for make it more simplier (and smaller, because it does not intend to solve all the corner cases that core.async does right now).

This code is implemented in CLJS for make available the channel abstraction to the CLJS, but the main use case for this ns is targeted to the JVM, where you will be able to take advantage of virtual threads and seamless blocking operations on channels.

EXPERIMENTAL API

A core.async analogous implementation of channels that uses promises
instead of callbacks for all operations and are intended to be used
as-is (using blocking operations) in go-blocks backed by virtual
threads.

There are no macro transformations, go blocks are just alias for the
`promesa.core/vthread` macro that launches an virtual thread.

This code is based on the same ideas as core.async but the
implementation is written from scratch, for make it more
simplier (and smaller, because it does not intend to solve all the
corner cases that core.async does right now).

This code is implemented in CLJS for make available the channel
abstraction to the CLJS, but the main use case for this ns is
targeted to the JVM, where you will be able to take advantage of
virtual threads and seamless blocking operations on channels.

**EXPERIMENTAL API**
raw docstring

*executor*clj/s

A defalt executor for channel callback dispatching

A defalt executor for channel callback dispatching
sourceraw docstring

<!clj

(<! port)
(<! port timeout-duration)
(<! port timeout-duration timeout-value)

A convenience alias for take!.

A convenience alias for `take!`.
sourceraw docstring

>!clj

(>! port val)
(>! port val timeout-duration)
(>! port val timeout-duration timeout-value)

A convenience alias for put!.

A convenience alias for `put!`.
sourceraw docstring

altsclj/s

(alts ports & {:as opts})

Completes at most one of several operations on channel. Receives a vector of operations and optional keyword options.

A channel operation is defined as a vector of 2 elements for take, and 3 elements for put. Unless the :priority option is true and if more than one channel operation is ready, a non-deterministic choice will be made.

Returns a promise instance that will be resolved when a single operation is ready to a vector [val channel] where val is return value of the operation and channel identifies the channel where the the operation is succeeded.

Completes at most one of several operations on channel. Receives a
vector of operations and optional keyword options.

A channel operation is defined as a vector of 2 elements for take,
and 3 elements for put. Unless the :priority option is true and if
more than one channel operation is ready, a non-deterministic choice
will be made.

Returns a promise instance that will be resolved when a single
operation is ready to a vector [val channel] where val is return
value of the operation and channel identifies the channel where the
the operation is succeeded.
sourceraw docstring

alts!clj

(alts! ports & {:as opts})

A blocking variant of alts.

A blocking variant of `alts`.
sourceraw docstring

chanclj/s

(chan & {:keys [buf xf exh exc] :or {exh channel/close-with-exception}})

Creates a new channel instance, it optionally accepts buffer, transducer and error handler. If buffer is an integer, it will be used as initial capacity for the expanding buffer.

Creates a new channel instance, it optionally accepts buffer,
transducer and error handler. If buffer is an integer, it will be
used as initial capacity for the expanding buffer.
sourceraw docstring

chan?clj/s

(chan? o)

Returns true if o is instance of Channel or satisfies IChannel protocol.

Returns true if `o` is instance of Channel or satisfies IChannel protocol.
sourceraw docstring

close!clj/s

(close! port)
(close! port cause)

Close the channel.

Close the channel.
sourceraw docstring

close-with-exceptionclj/s

(close-with-exception ch cause)

A channel exception handler that closes the channel with the cause if an exception is raised in the transducer.

A channel exception handler that closes the channel with the cause
if an exception is raised in the transducer.
sourceraw docstring

closed?clj/s

(closed? port)

Returns true if channel is closed.

Returns true if channel is closed.
sourceraw docstring

dropping-bufferclj/s

(dropping-buffer n)

Create a dropping buffer instance.

Create a dropping buffer instance.
sourceraw docstring

expanding-bufferclj/s

(expanding-buffer n)

Create a fixed size (but expanding) buffer instance.

This buffer is used by default if you pass an integer as buffer on channel constructor.

Create a fixed size (but expanding) buffer instance.

This buffer is used by default if you pass an integer as buffer on
channel constructor.
sourceraw docstring

fixed-bufferclj/s

(fixed-buffer n)

Create a fixed size buffer instance.

Create a fixed size buffer instance.
sourceraw docstring

goclj/smacro

(go & body)

Schedules the body to be executed asychronously, potentially using virtual thread if available (a normal thread will be used in other case, determined by executor dynamic var). Returns a promise instance that resolves with the return value when the asynchronous block finishes.

Forwards dynamic bindings.

Schedules the body to be executed asychronously, potentially using
virtual thread if available (a normal thread will be used in other
case, determined by *executor* dynamic var). Returns a promise
instance that resolves with the return value when the asynchronous
block finishes.

Forwards dynamic bindings.
sourceraw docstring

go-chanclj/smacro

(go-chan & body)

A convencience go macro version that returns a channel instead of a promise instance, has the same semantics as go macro.

A convencience go macro version that returns a channel instead of a
promise instance, has the same semantics as `go` macro.
sourceraw docstring

go-loopclj/smacro

(go-loop bindings & body)

A convencience helper macro that combines go + loop.

A convencience helper macro that combines go + loop.
sourceraw docstring

multclj/s

(mult & {:as opts})

Creates an instance of multiplexer.

A multiplexer instance acts like a write-only channel what enables a broadcast-like (instead of a queue-like) behavior. Channels containing copies of this multiplexer can be attached using tap! and dettached with untap!.

Each item is forwarded to all attached channels in parallel and synchronously; use buffers to prevent slow taps from holding up the multiplexer.

If there are no taps, all received items will be dropped. Closed channels will be automatically removed from multiplexer.

Do not creates vthreads (or threads).

Creates an instance of multiplexer.

A multiplexer instance acts like a write-only channel what enables a
broadcast-like (instead of a queue-like) behavior. Channels
containing copies of this multiplexer can be attached using `tap!`
and dettached with `untap!`.

Each item is forwarded to all attached channels in parallel and
synchronously; use buffers to prevent slow taps from holding up the
multiplexer.

If there are no taps, all received items will be dropped. Closed
channels will be automatically removed from multiplexer.

Do not creates vthreads (or threads).
sourceraw docstring

mult*clj/s

(mult* ch)
(mult* ch close?)

Create a multiplexer with an externally provided channel. From now, you can use the external channel or the multiplexer instace to put values in because multiplexer implements the IWriteChannel protocol.

Optionally accepts close? argument, that determines if the channel will be closed when close! is called on multiplexer o not.

Create a multiplexer with an externally provided channel. From now,
you can use the external channel or the multiplexer instace to put
values in because multiplexer implements the IWriteChannel protocol.

Optionally accepts `close?` argument, that determines if the channel will
be closed when `close!` is called on multiplexer o not.
sourceraw docstring

offer!clj/s

(offer! port val)

Puts a val into channel if it's possible to do so immediately. Returns a resolved promise with true if the operation succeeded. Never blocks.

Puts a val into channel if it's possible to do so immediately.
Returns a resolved promise with `true` if the operation
succeeded. Never blocks.
sourceraw docstring

once-bufferclj/s

(once-buffer)

Create a once buffer instance.

Create a once buffer instance.
sourceraw docstring

onto-chan!clj/s

(onto-chan! ch coll)
(onto-chan! ch coll close?)

Puts the contents of coll into the supplied channel.

By default the channel will be closed after the items are copied, but can be determined by the close? parameter. Returns a promise that will be resolved with nil once the items are copied.

Puts the contents of coll into the supplied channel.

By default the channel will be closed after the items are copied,
but can be determined by the close? parameter. Returns a promise
that will be resolved with `nil` once the items are copied.
sourceraw docstring

pipeclj/s

(pipe from to)
(pipe from to close?)

Takes elements from the from channel and supplies them to the to channel. By default, the to channel will be closed when the from channel closes, but can be determined by the close? parameter. Will stop consuming the from channel if the to channel closes.

Takes elements from the from channel and supplies them to the to
channel. By default, the to channel will be closed when the from
channel closes, but can be determined by the close?  parameter. Will
stop consuming the from channel if the to channel closes.
sourceraw docstring

pipelineclj

(pipeline & {:keys [typ in out f close? n exh] :or {typ :thread close? true}})

Create a processing pipeline with the ability to specify the process function proc-fn, the type of concurrency primitive to use (:thread or :vthread) and the parallelism.

The proc-fn should be a function that takes the value and the result channel; the result channel should be closed once the processing unit is finished.

By default the output channel is closed when pipeline is terminated, but it can be specified by user using the :close? parameter.

Returns a promise which will be resolved once the pipeline is terminated.

Example:

(def inp (sp/chan)) (def out (sp/chan (map inc)))

(sp/pipeline :typ :vthread :close? true :n 10 :in inp :out out :f proc-fn)

(sp/go (loop [] (when-let [val (sp/<! out)] (prn "RES:" val) (recur))) (prn "RES: END"))

(p/await! (sp/onto-chan! inp ["1" "2" "3" "4"] true))

Internally, uses 2 vthreads for pipeline internals processing.

EXPERIMENTAL: API subject to be changed or removed in future releases.

Create a processing pipeline with the ability to specify the process
function `proc-fn`, the type of concurrency primitive to
use (`:thread` or `:vthread`) and the parallelism.

The `proc-fn` should be a function that takes the value and the
result channel; the result channel should be closed once the
processing unit is finished.

By default the output channel is closed when pipeline is terminated,
but it can be specified by user using the `:close?` parameter.

Returns a promise which will be resolved once the pipeline is
terminated.

Example:

  (def inp (sp/chan))
  (def out (sp/chan (map inc)))

  (sp/pipeline :typ :vthread
               :close? true
               :n 10
               :in inp
               :out out
               :f proc-fn)

  (sp/go
    (loop []
      (when-let [val (sp/<! out)]
        (prn "RES:" val)
        (recur)))
    (prn "RES: END"))

  (p/await! (sp/onto-chan! inp ["1" "2" "3" "4"] true))

Internally, uses 2 vthreads for pipeline internals processing.

EXPERIMENTAL: API subject to be changed or removed in future
releases.
sourceraw docstring

poll!clj/s

(poll! port)

Takes a val from port if it's possible to do so immediatelly. Returns a resolved promise with the value if succeeded, nil otherwise.

Takes a val from port if it's possible to do so
immediatelly. Returns a resolved promise with the value if
succeeded,  `nil` otherwise.
sourceraw docstring

putclj/s

(put port val)
(put port val timeout-duration)
(put port val timeout-duration timeout-value)

Schedules a put operation on the channel. Returns a promise instance that will resolve to: false if channel is closed, true if put is succeed. If channel has buffer, it will return immediatelly with resolved promise.

Optionally accepts a timeout-duration and timeout-value. The timeout-duration can be a long or Duration instance measured in milliseconds.

Schedules a put operation on the channel. Returns a promise
instance that will resolve to: false if channel is closed, true if
put is succeed. If channel has buffer, it will return immediatelly
with resolved promise.

Optionally accepts a timeout-duration and timeout-value. The
`timeout-duration` can be a long or Duration instance measured in
milliseconds.
sourceraw docstring

put!clj

(put! port val)
(put! port val timeout-duration)
(put! port val timeout-duration timeout-value)

A blocking version of put.

A blocking version of `put`.
sourceraw docstring

sliding-bufferclj/s

(sliding-buffer n)

Create a sliding buffer instance.

Create a sliding buffer instance.
sourceraw docstring

takeclj/s

(take port)
(take port timeout-duration)
(take port timeout-duration timeout-value)

Schedules a take operation on the channel. Returns a promise instance that will resolve to: nil if channel is closed, obj if value is found. If channel has non-empty buffer the take operation will succeed immediatelly with resolved promise.

Optionally accepts a timeout-duration and timeout-value. The timeout-duration can be a long or Duration instance measured in milliseconds.

Schedules a take operation on the channel. Returns a promise instance
that will resolve to: nil if channel is closed, obj if value is
found. If channel has non-empty buffer the take operation will
succeed immediatelly with resolved promise.

Optionally accepts a timeout-duration and timeout-value. The
`timeout-duration` can be a long or Duration instance measured in
milliseconds.
sourceraw docstring

take!clj

(take! port)
(take! port timeout-duration)
(take! port timeout-duration timeout-value)

Blocking version of take.

Blocking version of `take`.
sourceraw docstring

tap!clj/s

(tap! mult ch)
(tap! mult ch close?)

Copies the multiplexer source onto the provided channel.

Copies the multiplexer source onto the provided channel.
sourceraw docstring

thread-chanclj/smacro

(thread-chan & body)

A convencience thread macro version that returns a channel instead of a promise instance.

A convencience thread macro version that returns a channel instead of
a promise instance.
sourceraw docstring

throw-uncaughtclj/s

(throw-uncaught ch cause)

A channel exception handler that throws the exception to the default uncaught exception handler.

A channel exception handler that throws the exception to the default
uncaught exception handler.
sourceraw docstring

timeoutclj/s

(timeout ms)

Returns a promise that will be resolved in the specified timeout. The default scheduler will be used.

Returns a promise that will be resolved in the specified timeout. The
default scheduler will be used.
sourceraw docstring

timeout-chanclj/s

(timeout-chan ms)
(timeout-chan scheduler ms)

Returns a channel that will be closed in the specified timeout. The default scheduler will be used. You can provide your own as optional first argument.

Returns a channel that will be closed in the specified timeout. The
default scheduler will be used. You can provide your own as optional
first argument.
sourceraw docstring

untap!clj/s

(untap! mult ch)

Disconnects a channel from the multiplexer.

Disconnects a channel from the multiplexer.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close