(!)
Throws an exception if current computation is required to terminate, otherwise returns nil.
Inside a process block, checks process cancellation status.
Outside of process blocks, fallbacks to thread interruption status if host platform supports it.
Throws an exception if current computation is required to terminate, otherwise returns nil. Inside a process block, checks process cancellation status. Outside of process blocks, fallbacks to thread interruption status if host platform supports it.
(? t)
Runs given task, waits for completion and returns result (or throws, if task fails).
Inside a process block, parks the process.
Outside of process blocks, fallbacks to thread blocking if host platform supports it.
Runs given task, waits for completion and returns result (or throws, if task fails). Inside a process block, parks the process. Outside of process blocks, fallbacks to thread blocking if host platform supports it.
(?! f)
In an ambiguous process block, runs given flow
preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if flow
emits another value before it terminates.
Example :
(defn debounce [delay flow]
(ap (let [x (?! flow)]
(try (? (sleep delay x))
(catch Exception _ (?? none))))))
(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
(? (sleep n n))))
(debounce 50)
(aggregate conj)))
#_=> [24 79 9 37]
In an ambiguous process block, runs given `flow` preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if `flow` emits another value before it terminates. Example : ```clojure (defn debounce [delay flow] (ap (let [x (?! flow)] (try (? (sleep delay x)) (catch Exception _ (?? none)))))) (? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))] (? (sleep n n)))) (debounce 50) (aggregate conj))) ``` #_=> [24 79 9 37]
(?= f)
In an ambiguous process block, runs given flow
and concurrently forks current process for each value produced by the flow. Values emitted by forked processes are gathered and emitted as soon as available.
Example :
(? (->> (m/ap
(let [x (m/?= (m/enumerate [19 57 28 6 87]))]
(m/? (m/sleep x x))))
(aggregate conj)))
#_=> [6 19 28 57 87]
In an ambiguous process block, runs given `flow` and concurrently forks current process for each value produced by the flow. Values emitted by forked processes are gathered and emitted as soon as available. Example : ```clojure (? (->> (m/ap (let [x (m/?= (m/enumerate [19 57 28 6 87]))] (m/? (m/sleep x x)))) (aggregate conj))) #_=> [6 19 28 57 87] ```
(?? f)
In an ambiguous process block, runs given flow
non-preemptively (aka concat), forking process for each emitted value.
Example :
(? (aggregate conj (ap (inc (?? (enumerate [1 2 3]))))))
#_=> [2 3 4]
In an ambiguous process block, runs given `flow` non-preemptively (aka concat), forking process for each emitted value. Example : ```clojure (? (aggregate conj (ap (inc (?? (enumerate [1 2 3])))))) #_=> [2 3 4] ```
(absolve task)
Returns a task running given task
completing with a zero-argument function and completing with the result of this function call.
Returns a task running given `task` completing with a zero-argument function and completing with the result of this function call.
(aggregate rf flow)
(aggregate rf i flow)
Returns a task reducing values produced by given discrete flow
with rf
, starting with init
(or, if not provided, the result of calling rf
with no argument).
Cancelling propagates to upstream flow. Early termination by rf
(via reduced
or throwing) cancels upstream flow.
Example :
(? (aggregate + (enumerate (range 10))))
#_=> 45
Returns a task reducing values produced by given discrete `flow` with `rf`, starting with `init` (or, if not provided, the result of calling `rf` with no argument). Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow. Example : ```clojure (? (aggregate + (enumerate (range 10)))) #_=> 45 ```
(ap & body)
Returns a flow evaluating body
(in an implicit do
) and producing values of each subsequent fork. Body evaluation can be parked with ?
and forked with ??
, ?!
and ?=
.
Cancelling an ap
flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.
Returns a flow evaluating `body` (in an implicit `do`) and producing values of each subsequent fork. Body evaluation can be parked with `?` and forked with `??`, `?!` and `?=`. Cancelling an `ap` flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.
(attempt task)
Returns a task always succeeding with result of given task
wrapped in a zero-argument function returning result if successful or throwing exception if failed.
Returns a task always succeeding with result of given `task` wrapped in a zero-argument function returning result if successful or throwing exception if failed.
A java.util.concurrent.Executor
optimized for blocking evaluation.
A `java.util.concurrent.Executor` optimized for blocking evaluation.
(buffer c f)
Returns a discrete flow producing values emitted by given discrete flow
, accumulating upstream overflow up to capacity
items.
Returns a discrete flow producing values emitted by given discrete `flow`, accumulating upstream overflow up to `capacity` items.
(compel task)
Inhibits cancellation signal of given task
.
Inhibits cancellation signal of given `task`.
A java.util.concurrent.Executor
optimized for non-blocking evaluation.
A `java.util.concurrent.Executor` optimized for non-blocking evaluation.
(dfv)
Creates an instance of dataflow variable (aka single-assignment).
A dataflow variable is a function implementing assign
on 1-arity and deref
on 2-arity (as task). assign
immediately binds the variable to given value if not already bound and returns bound value. deref
is a task completing with the value bound to the variable as soon as it's available.
Cancelling a deref
task makes it fail immediately.
Creates an instance of dataflow variable (aka single-assignment). A dataflow variable is a function implementing `assign` on 1-arity and `deref` on 2-arity (as task). `assign` immediately binds the variable to given value if not already bound and returns bound value. `deref` is a task completing with the value bound to the variable as soon as it's available. Cancelling a `deref` task makes it fail immediately. ```
(enumerate coll)
Returns a discrete flow producing values from given collection
. Cancelling before having reached the end makes the flow fail immediately.
Returns a discrete flow producing values from given `collection`. Cancelling before having reached the end makes the flow fail immediately.
(gather)
(gather f)
(gather f & fs)
Returns a discrete flow running given discrete flows
in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated.
Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled.
Example :
(? (->> (gather (enumerate [1 2 3])
(enumerate [:a :b :c]))
(aggregate conj)))
#_=> [1 :a 2 :b 3 :c]
Returns a discrete flow running given discrete `flows` in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated. Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled. Example : ```clojure (? (->> (gather (enumerate [1 2 3]) (enumerate [:a :b :c])) (aggregate conj))) #_=> [1 :a 2 :b 3 :c] ```
(holding lock & body)
acquire
s given semaphore
and evaluates body
(in an implicit do
), ensuring semaphore
is release
d after evaluation.
`acquire`s given `semaphore` and evaluates `body` (in an implicit `do`), ensuring `semaphore` is `release`d after evaluation.
(integrate rf f)
(integrate rf i f)
Returns a discrete flow running given discrete flow
and emitting given init
value (or, if not provided, the result of calling rf
with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value.
Cancelling propagates to upstream flow. Early termination by rf
(via reduced
or throwing) cancels upstream flow.
Example :
(? (->> [1 2 3 4 5]
(enumerate)
(integrate +)
(aggregate conj)))
#_=> [0 1 3 6 10 15]
Returns a discrete flow running given discrete `flow` and emitting given `init` value (or, if not provided, the result of calling `rf` with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value. Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow. Example : ```clojure (? (->> [1 2 3 4 5] (enumerate) (integrate +) (aggregate conj))) #_=> [0 1 3 6 10 15] ```
(join c)
(join c & ts)
Returns a task running given tasks
concurrently.
If every task succeeds, join
completes with the result of applying f
to these results.
If any task fails, others are cancelled then join
fails with this error.
Cancelling propagates to children tasks.
Example :
(? (join vector (sleep 1000 1) (sleep 1000 2)))
#_=> [1 2] ;; 1 second later
Returns a task running given `tasks` concurrently. If every task succeeds, `join` completes with the result of applying `f` to these results. If any task fails, others are cancelled then `join` fails with this error. Cancelling propagates to children tasks. Example : ```clojure (? (join vector (sleep 1000 1) (sleep 1000 2))) #_=> [1 2] ;; 1 second later ```
(latest f)
(latest f & fs)
Returns a continuous flow running given continuous flows
in parallel and combining latest value of each flow with given function f
.
(defn sleep-emit [delays]
(ap (let [n (?? (enumerate delays))]
(? (sleep n n)))))
(defn delay-each [delay input]
(ap (? (sleep delay (?? input)))))
(? (->> (latest vector
(sleep-emit [24 79 67 34])
(sleep-emit [86 12 37 93]))
(delay-each 50)
(aggregate conj)))
#_=> [[24 86] [24 12] [79 37] [67 37] [34 93]]
Returns a continuous flow running given continuous `flows` in parallel and combining latest value of each flow with given function `f`. ```clojure (defn sleep-emit [delays] (ap (let [n (?? (enumerate delays))] (? (sleep n n))))) (defn delay-each [delay input] (ap (? (sleep delay (?? input))))) (? (->> (latest vector (sleep-emit [24 79 67 34]) (sleep-emit [86 12 37 93])) (delay-each 50) (aggregate conj))) #_=> [[24 86] [24 12] [79 37] [67 37] [34 93]] ```
(mbx)
Creates an instance of mailbox.
A mailbox is a function implementing post
on 1-arity and fetch
on 2-arity (as task). post
immediately pushes given value to mailbox and returns nil. fetch
is a task pulling a value from mailbox as soon as it's non-empty and completing with this value.
Cancelling a fetch
task makes it fail immediately.
Example : an actor is a mailbox associated with a process consuming messages.
(defn crash [^Throwable e] ;; let it crash philosophy
(.printStackTrace e)
(System/exit -1))
(defn actor
([init] (actor init crash))
([init fail]
(let [self (mbx)]
((sp
(loop [b init]
(recur (b self (? self)))))
nil fail)
self)))
(def counter
(actor
((fn beh [n]
(fn [self cust]
(cust n)
(beh (inc n)))) 0)))
(counter prn) ;; prints 0
(counter prn) ;; prints 1
(counter prn) ;; prints 2
Creates an instance of mailbox. A mailbox is a function implementing `post` on 1-arity and `fetch` on 2-arity (as task). `post` immediately pushes given value to mailbox and returns nil. `fetch` is a task pulling a value from mailbox as soon as it's non-empty and completing with this value. Cancelling a `fetch` task makes it fail immediately. Example : an actor is a mailbox associated with a process consuming messages. ```clojure (defn crash [^Throwable e] ;; let it crash philosophy (.printStackTrace e) (System/exit -1)) (defn actor ([init] (actor init crash)) ([init fail] (let [self (mbx)] ((sp (loop [b init] (recur (b self (? self))))) nil fail) self))) (def counter (actor ((fn beh [n] (fn [self cust] (cust n) (beh (inc n)))) 0))) (counter prn) ;; prints 0 (counter prn) ;; prints 1 (counter prn) ;; prints 2 ```
The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect.
Example :
(? (aggregate conj none))
#_=> []
The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect. Example : ```clojure (? (aggregate conj none)) #_=> [] ```
(observe s)
Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. subject
must be a function taking a 1-arity event
function and returning a 0-arity cleanup
function.
subject
function is called on initialization. cleanup
function is called on cancellation. event
function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.
Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. `subject` must be a function taking a 1-arity `event` function and returning a 0-arity `cleanup` function. `subject` function is called on initialization. `cleanup` function is called on cancellation. `event` function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.
(publisher flow)
Returns a org.reactivestreams.Publisher
running given discrete flow
on each subscription.
Returns a `org.reactivestreams.Publisher` running given discrete `flow` on each subscription.
(race)
(race & ts)
Returns a task running given tasks
concurrently.
If any task succeeds, others are cancelled then race
completes with this result.
If every task fails, race
fails.
Cancelling propagates to children tasks.
Example :
(? (race (sleep 1000 1) (sleep 2000 2)))
#_=> 1 ;; 1 second later
Returns a task running given `tasks` concurrently. If any task succeeds, others are cancelled then `race` completes with this result. If every task fails, `race` fails. Cancelling propagates to children tasks. Example : ```clojure (? (race (sleep 1000 1) (sleep 2000 2))) #_=> 1 ;; 1 second later ```
(rdv)
Creates an instance of synchronous rendez-vous.
A synchronous rendez-vous is a function implementing give
on its 1-arity and take
on its 2-arity (as task). give
takes a value to be transferred and returns a task completing with nil as soon as a taker is available. take
is a task completing with transferred value as soon as a giver is available.
Cancelling give
and take
tasks makes them fail immediately.
Example : producer / consumer stream communication
(defn reducer [rf i take]
(sp
(loop [r i]
(let [x (? take)]
(if (identical? x take)
r (recur (rf r x)))))))
(defn iterator [give xs]
(sp
(loop [xs (seq xs)]
(if-some [[x & xs] xs]
(do (? (give x))
(recur xs))
(? (give give))))))
(def stream (rdv))
(? (join {} (iterator stream (range 100)) (reducer + 0 stream))) ;; returns 4950
Creates an instance of synchronous rendez-vous. A synchronous rendez-vous is a function implementing `give` on its 1-arity and `take` on its 2-arity (as task). `give` takes a value to be transferred and returns a task completing with nil as soon as a taker is available. `take` is a task completing with transferred value as soon as a giver is available. Cancelling `give` and `take` tasks makes them fail immediately. Example : producer / consumer stream communication ```clojure (defn reducer [rf i take] (sp (loop [r i] (let [x (? take)] (if (identical? x take) r (recur (rf r x))))))) (defn iterator [give xs] (sp (loop [xs (seq xs)] (if-some [[x & xs] xs] (do (? (give x)) (recur xs)) (? (give give)))))) (def stream (rdv)) (? (join {} (iterator stream (range 100)) (reducer + 0 stream))) ;; returns 4950 ```
(reactor & body)
Calls reactor-call
with a function evaluating given body
in an implicit do
.
Calls `reactor-call` with a function evaluating given `body` in an implicit `do`.
(reactor-call i)
Returns a task spawning a reactor context with given boot function, called without argument. A reactor context manages
the lifecycle of running flows and serializes their emissions in propagation turns. Flows running in a reactor context
are called publishers, they can be spawned in the boot function or in reaction to any subsequent emission with stream!
or signal!
, respectively for discrete and continuous flows. Publishers of a given reactor context are totally ordered
by the union of two partial orders :
A publisher can subscribe to the feed of an inferior publisher from the same reactor context. Using a publisher as a flow spawns a subscription to this publisher. Publisher emissions are grouped in propagation turns, where successive publishers of a given turn are strictly increasing. When a publisher becomes able to emit, it is compared to the publisher currently emitting to figure out whether the emission must happen on the current turn or on the next one. Cyclic reactions are possible and no attempt is made to prevent them, so care must be taken to ensure the propagation eventually stops. The dispatching mechanism depends on the nature of the publisher, discrete or continuous.
A subscription terminates when it's cancelled or when the underlying publisher terminates. A publisher can be cancelled, as long as its flow is not terminated, by calling it as a zero-argument function. A cancelled publisher cancels its flow, transfers and discards all of its remaining values without backpressure, and all of its current and future subscriptions fail immediately. Cancelling a reactor cancels all of its publishers, and any subsequent publisher spawning will fail. If any publisher flow fails, or if the boot function throws an exception, the reactor is cancelled. A reactor terminates at the end of the first turn where every publisher flow is terminated, meaning no emission can ever happen anymore. It succeeds with the result of the boot function if no publisher failed, otherwise it fails with the first error encountered.
Returns a task spawning a reactor context with given boot function, called without argument. A reactor context manages the lifecycle of running flows and serializes their emissions in propagation turns. Flows running in a reactor context are called publishers, they can be spawned in the boot function or in reaction to any subsequent emission with `stream!` or `signal!`, respectively for discrete and continuous flows. Publishers of a given reactor context are totally ordered by the union of two partial orders : * if a publisher was created by another one, the parent is inferior to the child. * if two publishers are siblings, the older is inferior to the younger. A publisher can subscribe to the feed of an inferior publisher from the same reactor context. Using a publisher as a flow spawns a subscription to this publisher. Publisher emissions are grouped in propagation turns, where successive publishers of a given turn are strictly increasing. When a publisher becomes able to emit, it is compared to the publisher currently emitting to figure out whether the emission must happen on the current turn or on the next one. Cyclic reactions are possible and no attempt is made to prevent them, so care must be taken to ensure the propagation eventually stops. The dispatching mechanism depends on the nature of the publisher, discrete or continuous. * a stream is able to emit when its flow is ready to transfer and the backpressure of the previous emission has been collected from all of its subscriptions. On emission, a value is transferred from the flow and subscriptions are notified of the availability of this value. Until the end of the propagation turn, each new subscription to this stream will be immediately notified. * a signal always has a current value, thus each new subscription is immediately notified. It is able to emit when its flow is ready to transfer. On emission, its current value is marked as stale and its subscriptions are notified if not already. From this point, a sampling request from any of its subscriptions will trigger the transfer from the flow to refresh the current value. Subsequent sampling requests will reuse this memoized value until next emission. A subscription terminates when it's cancelled or when the underlying publisher terminates. A publisher can be cancelled, as long as its flow is not terminated, by calling it as a zero-argument function. A cancelled publisher cancels its flow, transfers and discards all of its remaining values without backpressure, and all of its current and future subscriptions fail immediately. Cancelling a reactor cancels all of its publishers, and any subsequent publisher spawning will fail. If any publisher flow fails, or if the boot function throws an exception, the reactor is cancelled. A reactor terminates at the end of the first turn where every publisher flow is terminated, meaning no emission can ever happen anymore. It succeeds with the result of the boot function if no publisher failed, otherwise it fails with the first error encountered.
(relieve rf f)
Returns a continuous flow producing values emitted by given discrete flow
, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function rf
.
Cancelling propagates to upstream. If rf
throws, upstream flow
is cancelled.
Example :
;; Delays each `input` value by `delay` milliseconds
(defn delay-each [delay input]
(ap (? (sleep delay (?? input)))))
(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
(? (sleep n n))))
(relieve +)
(delay-each 80)
(aggregate conj)))
#_=> [24 79 67 61 99 37]
Returns a continuous flow producing values emitted by given discrete `flow`, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function `rf`. Cancelling propagates to upstream. If `rf` throws, upstream `flow` is cancelled. Example : ```clojure ;; Delays each `input` value by `delay` milliseconds (defn delay-each [delay input] (ap (? (sleep delay (?? input))))) (? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))] (? (sleep n n)))) (relieve +) (delay-each 80) (aggregate conj))) #_=> [24 79 67 61 99 37] ```
(sample f sd sr)
Returns a discrete flow running given sampler
discrete flow and sampled
continuous flow in parallel. For each sampler
value, emits the result of function f
called with current values of sampled
and sampler
.
Cancellation propagates to both flows. When sampler
terminates, sampled
is cancelled. A failure in any of both flows, or f
throwing an exception, or trying to pull a value before first value of sampled
will cancel the flow and propagate the error.
Example :
(defn sleep-emit [delays]
(ap (let [n (?? (enumerate delays))]
(? (sleep n n)))))
(defn delay-each [delay input]
(ap (? (sleep delay (?? input)))))
(? (->> (sample vector
(sleep-emit [24 79 67 34])
(sleep-emit [86 12 37 93]))
(delay-each 50)
(aggregate conj)))
#_=> [[24 86] [24 12] [79 37] [67 93]]
Returns a discrete flow running given `sampler` discrete flow and `sampled` continuous flow in parallel. For each `sampler` value, emits the result of function `f` called with current values of `sampled` and `sampler`. Cancellation propagates to both flows. When `sampler` terminates, `sampled` is cancelled. A failure in any of both flows, or `f` throwing an exception, or trying to pull a value before first value of `sampled` will cancel the flow and propagate the error. Example : ```clojure (defn sleep-emit [delays] (ap (let [n (?? (enumerate delays))] (? (sleep n n))))) (defn delay-each [delay input] (ap (? (sleep delay (?? input))))) (? (->> (sample vector (sleep-emit [24 79 67 34]) (sleep-emit [86 12 37 93])) (delay-each 50) (aggregate conj))) #_=> [[24 86] [24 12] [79 37] [67 93]] ```
(sem)
(sem n)
Creates a semaphore initialized with n tokens (1 if not provided, aka mutex).
A semaphore is a function implementing release
on 0-arity and acquire
on 2-arity (as task). release
immediately makes a token available and returns nil. acquire
is a task completing with nil as soon as a token is available.
Cancelling an acquire
task makes it fail immediately.
Example : dining philosophers
(defn phil [name f1 f2]
(sp
(while true
(prn name :thinking)
(? (sleep 500))
(holding f1
(holding f2
(prn name :eating)
(? (sleep 600)))))))
(def forks (vec (repeatedly 5 sem)))
(? (timeout 10000
(join vector
(phil "descartes" (forks 0) (forks 1))
(phil "hume" (forks 1) (forks 2))
(phil "plato" (forks 2) (forks 3))
(phil "nietzsche" (forks 3) (forks 4))
(phil "kant" (forks 0) (forks 4)))))
Creates a semaphore initialized with n tokens (1 if not provided, aka mutex). A semaphore is a function implementing `release` on 0-arity and `acquire` on 2-arity (as task). `release` immediately makes a token available and returns nil. `acquire` is a task completing with nil as soon as a token is available. Cancelling an `acquire` task makes it fail immediately. Example : dining philosophers ```clojure (defn phil [name f1 f2] (sp (while true (prn name :thinking) (? (sleep 500)) (holding f1 (holding f2 (prn name :eating) (? (sleep 600))))))) (def forks (vec (repeatedly 5 sem))) (? (timeout 10000 (join vector (phil "descartes" (forks 0) (forks 1)) (phil "hume" (forks 1) (forks 2)) (phil "plato" (forks 2) (forks 3)) (phil "nietzsche" (forks 3) (forks 4)) (phil "kant" (forks 0) (forks 4))))) ```
(signal! f)
Spawns a continuous publisher from given flow, see reactor-call
.
Spawns a continuous publisher from given flow, see `reactor-call`.
(sleep d)
(sleep d x)
Returns a task completing with given value (nil if not provided) after given duration (in milliseconds).
Cancelling a sleep task makes it fail immediately.
Example :
(? (sleep 1000 42))
#_=> 42 ;; 1 second later
Returns a task completing with given value (nil if not provided) after given duration (in milliseconds). Cancelling a sleep task makes it fail immediately. Example : ```clojure (? (sleep 1000 42)) #_=> 42 ;; 1 second later ```
(sp & body)
Returns a task evaluating body
(in an implicit do
). Body evaluation can be parked with ?
.
Cancelling an sp
task triggers cancellation of the task it's currently running, along with all tasks subsequently run.
Returns a task evaluating `body` (in an implicit `do`). Body evaluation can be parked with `?`. Cancelling an `sp` task triggers cancellation of the task it's currently running, along with all tasks subsequently run.
(stream! f)
Spawns a discrete publisher from given flow, see reactor-call
.
Spawns a discrete publisher from given flow, see `reactor-call`.
(subscribe pub)
Returns a discrete flow subscribing to given org.reactivestreams.Publisher
.
Returns a discrete flow subscribing to given `org.reactivestreams.Publisher`.
(timeout delay task)
Returns a task running given task
and cancelling it if not completed within given delay
(in milliseconds).
(? (timeout 100 (sleep (rand-int 200))))
#_=> nil ;; or exception, after 100 milliseconds
Returns a task running given `task` and cancelling it if not completed within given `delay` (in milliseconds). ```clojure (? (timeout 100 (sleep (rand-int 200)))) #_=> nil ;; or exception, after 100 milliseconds ```
(transform x f)
Returns a discrete flow running given discrete flow
and transforming values with given transducer xf
.
Cancelling propagates to upstream flow. Early termination by xf
(via reduced
or throwing) cancels upstream flow.
Example :
(? (->> (enumerate (range 10))
(transform (comp (filter odd?) (mapcat range) (partition-all 4)))
(aggregate conj)))
#_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]]
Returns a discrete flow running given discrete `flow` and transforming values with given transducer `xf`. Cancelling propagates to upstream flow. Early termination by `xf` (via `reduced` or throwing) cancels upstream flow. Example : ```clojure (? (->> (enumerate (range 10)) (transform (comp (filter odd?) (mapcat range) (partition-all 4))) (aggregate conj))) #_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]] ```
(via exec & body)
Returns a task evaluating body (in an implicit do
) on given java.util.concurrent.Executor
and completing with its result.
Cancellation interrupts the evaluating thread.
Not supported on clojurescript.
Example :
Returns a task evaluating body (in an implicit `do`) on given `java.util.concurrent.Executor` and completing with its result. Cancellation interrupts the evaluating thread. Not supported on clojurescript. Example : ```clojure ```
(via-call e t)
Same as via
, except the expression to evaluate is provided as a zero-arity function on second argument.
Not supported on clojurescript.
(? (via-call blk read-line))
;; reads a line from stdin and returns it
Same as `via`, except the expression to evaluate is provided as a zero-arity function on second argument. Not supported on clojurescript. ```clojure (? (via-call blk read-line)) ;; reads a line from stdin and returns it ```
(watch r)
Returns a continuous flow producing successive values of given reference
until cancelled. Given reference must support add-watch
, remove-watch
and deref
. Oldest values are discarded on overflow.
Returns a continuous flow producing successive values of given `reference` until cancelled. Given reference must support `add-watch`, `remove-watch` and `deref`. Oldest values are discarded on overflow.
(zip c f & fs)
Returns a discrete flow running given discrete flows
is parallel and emitting the result of applying f
to the set of first values emitted by each upstream flow, followed by the result of applying f
to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled.
Cancelling propagates to every upstream flow. If any upstream flow fails or if f
throws, the flow is cancelled.
Example :
(m/? (->> (m/zip vector
(m/enumerate [1 2 3])
(m/enumerate [:a :b :c]))
(m/aggregate conj)))
#_=> [[1 :a] [2 :b] [3 :c]]
Returns a discrete flow running given discrete `flows` is parallel and emitting the result of applying `f` to the set of first values emitted by each upstream flow, followed by the result of applying `f` to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled. Cancelling propagates to every upstream flow. If any upstream flow fails or if `f` throws, the flow is cancelled. Example : ```clojure (m/? (->> (m/zip vector (m/enumerate [1 2 3]) (m/enumerate [:a :b :c])) (m/aggregate conj))) #_=> [[1 :a] [2 :b] [3 :c]] ```
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close