Liking cljdoc? Tell your friends :D

missionary.core


!clj/s

(!)

Throws an exception if current computation is required to terminate, otherwise returns nil.

Inside a process block, checks process cancellation status.

Outside of process blocks, fallbacks to thread interruption status if host platform supports it.

Throws an exception if current computation is required to terminate, otherwise returns nil.

Inside a process block, checks process cancellation status.

Outside of process blocks, fallbacks to thread interruption status if host platform supports it.
sourceraw docstring

?clj/s

(? t)

Runs given task, waits for completion and returns result (or throws, if task fails).

Inside a process block, parks the process.

Outside of process blocks, fallbacks to thread blocking if host platform supports it.

Runs given task, waits for completion and returns result (or throws, if task fails).

Inside a process block, parks the process.

Outside of process blocks, fallbacks to thread blocking if host platform supports it.
sourceraw docstring

?!clj/smacrodeprecated

(?! f)

Alias for ?<

Alias for `?<`
sourceraw docstring

?<clj/s

(?< f)

In an ambiguous process block, runs given flow preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if flow emits another value before it terminates.

Example :

(defn debounce [delay flow]
  (ap (let [x (?< flow)]
    (try (? (sleep delay x))
         (catch Exception _ (?> none))))))

(? (->> (ap (let [n (?> (seed [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (debounce 50)
        (reduce conj)))

#_=> [24 79 9 37]

In an ambiguous process block, runs given `flow` preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if `flow` emits another value before it terminates.

Example :
```clojure
(defn debounce [delay flow]
  (ap (let [x (?< flow)]
    (try (? (sleep delay x))
         (catch Exception _ (?> none))))))

(? (->> (ap (let [n (?> (seed [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (debounce 50)
        (reduce conj)))
```
#_=> [24 79 9 37]
sourceraw docstring

?=clj/s

(?= f)

In an ambiguous process block, runs given flow and concurrently forks current process for each value produced by the flow. Values emitted by forked processes are gathered and emitted as soon as available.

Example :

(? (->> (m/ap
          (let [x (m/?= (m/seed [19 57 28 6 87]))]
            (m/? (m/sleep x x))))
        (reduce conj)))

#_=> [6 19 28 57 87]
In an ambiguous process block, runs given `flow` and concurrently forks current process for each value produced by the flow. Values emitted by forked processes are gathered and emitted as soon as available.

Example :
```clojure
(? (->> (m/ap
          (let [x (m/?= (m/seed [19 57 28 6 87]))]
            (m/? (m/sleep x x))))
        (reduce conj)))

#_=> [6 19 28 57 87]
```
sourceraw docstring

?>clj/s

(?> f)

In an ambiguous process block, runs given flow non-preemptively (aka concat), forking process for each emitted value.

Example :

(? (reduce conj (ap (inc (?> (seed [1 2 3]))))))
#_=> [2 3 4]
In an ambiguous process block, runs given `flow` non-preemptively (aka concat), forking process for each emitted value.

Example :
```clojure
(? (reduce conj (ap (inc (?> (seed [1 2 3]))))))
#_=> [2 3 4]
```
sourceraw docstring

??clj/smacrodeprecated

(?? f)

Alias for ?>

Alias for `?>`
sourceraw docstring

absolveclj/s

(absolve task)

Returns a task running given task completing with a zero-argument function and completing with the result of this function call.

Returns a task running given `task` completing with a zero-argument function and completing with the result of this function call.
sourceraw docstring

aggregateclj/sdeprecated

Alias for reduce

Alias for `reduce`
sourceraw docstring

amb=clj/smacro

(amb= & forms)

In an ap block, evaluates each form concurrently and returns results in order of availability.

In an `ap` block, evaluates each form concurrently and returns results in order of availability.
sourceraw docstring

amb>clj/smacro

(amb> & forms)

In an ap block, evaluates each form sequentially and returns successive results.

In an `ap` block, evaluates each form sequentially and returns successive results.
sourceraw docstring

apclj/smacro

(ap & body)

Returns a flow evaluating body (in an implicit do) and producing values of each subsequent fork. Body evaluation can be parked with ? and forked with ?>, ?< and ?=.

Cancelling an ap flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.

Returns a flow evaluating `body` (in an implicit `do`) and producing values of each subsequent fork. Body evaluation can be parked with `?` and forked with `?>`, `?<` and `?=`.

Cancelling an `ap` flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.
sourceraw docstring

attemptclj/s

(attempt task)

Returns a task always succeeding with result of given task wrapped in a zero-argument function returning result if successful or throwing exception if failed.

Returns a task always succeeding with result of given `task` wrapped in a zero-argument function returning result if successful or throwing exception if failed.
sourceraw docstring

blkclj/s

A java.util.concurrent.Executor optimized for blocking evaluation.

A `java.util.concurrent.Executor` optimized for blocking evaluation.
sourceraw docstring

bufferclj/s

(buffer c f)

Returns a discrete flow producing values emitted by given discrete flow, accumulating upstream overflow up to capacity items.

Returns a discrete flow producing values emitted by given discrete `flow`, accumulating upstream overflow up to `capacity` items.
sourceraw docstring

compelclj/s

(compel task)

Inhibits cancellation signal of given task.

Inhibits cancellation signal of given `task`.
sourceraw docstring

cpuclj/s

A java.util.concurrent.Executor optimized for non-blocking evaluation.

A `java.util.concurrent.Executor` optimized for non-blocking evaluation.
sourceraw docstring

dfvclj/s

(dfv)

Creates an instance of dataflow variable (aka single-assignment).

A dataflow variable is a function implementing assign on 1-arity and deref on 2-arity (as task). assign immediately binds the variable to given value if not already bound and returns bound value. deref is a task completing with the value bound to the variable as soon as it's available.

Cancelling a deref task makes it fail immediately.

Creates an instance of dataflow variable (aka single-assignment).

A dataflow variable is a function implementing `assign` on 1-arity and `deref` on 2-arity (as task). `assign` immediately binds the variable to given value if not already bound and returns bound value. `deref` is a task completing with the value bound to the variable as soon as it's available.

Cancelling a `deref` task makes it fail immediately.
```
sourceraw docstring

eductionclj/s

(eduction f)
(eduction x f)
(eduction x f & fs)

Returns a discrete flow running given discrete flow and transforming values with given transducer xf.

Cancelling propagates to upstream flow. Early termination by xf (via reduced or throwing) cancels upstream flow.

Example :

(? (->> (seed (range 10))
        (eduction (comp (filter odd?) (mapcat range) (partition-all 4)))
        (reduce conj)))
#_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]]
Returns a discrete flow running given discrete `flow` and transforming values with given transducer `xf`.

Cancelling propagates to upstream flow. Early termination by `xf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (->> (seed (range 10))
        (eduction (comp (filter odd?) (mapcat range) (partition-all 4)))
        (reduce conj)))
#_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]]
```
sourceraw docstring

enumerateclj/sdeprecated

Alias for seed

Alias for `seed`
sourceraw docstring

gatherclj/sdeprecated

(gather)
(gather f)
(gather f & fs)

Returns a discrete flow running given discrete flows in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated.

Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled.

Example :

(? (->> (gather (seed [1 2 3])
                (seed [:a :b :c]))
        (reduce conj)))
#_=> [1 :a 2 :b 3 :c]
Returns a discrete flow running given discrete `flows` in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated.

Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled.

Example :
```clojure
(? (->> (gather (seed [1 2 3])
                (seed [:a :b :c]))
        (reduce conj)))
#_=> [1 :a 2 :b 3 :c]
```
sourceraw docstring

group-byclj/s

(group-by kf f)

Returns a discrete flow running given discrete flow, calling given key function on each produced value, grouping values according to keys returned by the function, and producing a key-group pair for each grouping found. A group is a flow consuming values matching a key. Upstream values are dispatched in constant time to their group consumer.

Cancelling a group consumer makes it fail immediately. If a value is subsequently found for the same grouping, the key-group pair is produced again, including in the special case where the consumer is cancelled while a transfer was pending.

If upstream fails, or if the key function throws, then upstream is cancelled and flushed and the error is propagated downstream.

When the last upstream value is consumed, downstream terminates along with each active consumer and subsequent ones.

Concurrent consumers on a single group are not allowed, attempting to do so will fail the latest consumer.

Example :

(def words ["Air" "Bud" "Cup" "Awake" "Break" "Chunk" "Ant" "Big" "Check"])
(def groups
  (m/ap (let [[k >x] (m/?= (m/group-by (juxt first count) (m/seed words)))]
          [k (m/? (m/reduce conj >x))])))
(m/? (m/reduce conj {} groups))
#_=> {[\C 3] ["Cup"],
      [\B 5] ["Break"],
      [\A 5] ["Awake"],
      [\B 3] ["Bud" "Big"],
      [\A 3] ["Air" "Ant"],
      [\C 5] ["Chunk" "Check"]}
Returns a discrete flow running given discrete flow, calling given key function on each produced value, grouping values
according to keys returned by the function, and producing a key-group pair for each grouping found. A group is a flow
consuming values matching a key. Upstream values are dispatched in constant time to their group consumer.

Cancelling a group consumer makes it fail immediately. If a value is subsequently found for the same grouping, the
key-group pair is produced again, including in the special case where the consumer is cancelled while a transfer was
pending.

If upstream fails, or if the key function throws, then upstream is cancelled and flushed and the error is propagated
downstream.

When the last upstream value is consumed, downstream terminates along with each active consumer and subsequent ones.

Concurrent consumers on a single group are not allowed, attempting to do so will fail the latest consumer.

Example :
```clojure
(def words ["Air" "Bud" "Cup" "Awake" "Break" "Chunk" "Ant" "Big" "Check"])
(def groups
  (m/ap (let [[k >x] (m/?= (m/group-by (juxt first count) (m/seed words)))]
          [k (m/? (m/reduce conj >x))])))
(m/? (m/reduce conj {} groups))
#_=> {[\C 3] ["Cup"],
      [\B 5] ["Break"],
      [\A 5] ["Awake"],
      [\B 3] ["Bud" "Big"],
      [\A 3] ["Air" "Ant"],
      [\C 5] ["Chunk" "Check"]}
```
sourceraw docstring

holdingclj/smacro

(holding lock & body)

acquires given semaphore and evaluates body (in an implicit do), ensuring semaphore is released after evaluation.

`acquire`s given `semaphore` and evaluates `body` (in an implicit `do`), ensuring `semaphore` is `release`d after evaluation.
sourceraw docstring

integrateclj/sdeprecated

Alias for reductions

Alias for `reductions`
sourceraw docstring

joinclj/s

(join c)
(join c & ts)

Returns a task running given tasks concurrently.

If every task succeeds, join completes with the result of applying f to these results.

If any task fails, others are cancelled then join fails with this error.

Cancelling propagates to children tasks.

Example :

(? (join vector (sleep 1000 1) (sleep 1000 2)))
#_=> [1 2]            ;; 1 second later
Returns a task running given `tasks` concurrently.

If every task succeeds, `join` completes with the result of applying `f` to these results.

If any task fails, others are cancelled then `join` fails with this error.

Cancelling propagates to children tasks.

Example :
```clojure
(? (join vector (sleep 1000 1) (sleep 1000 2)))
#_=> [1 2]            ;; 1 second later
```
sourceraw docstring

latestclj/s

(latest f)
(latest f & fs)

Returns a continuous flow running given continuous flows in parallel and combining latest value of each flow with given function f.

(defn sleep-emit [delays]
  (ap (let [n (?> (seed delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (latest vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (reduce conj)))

#_=> [[24 86] [24 12] [79 37] [67 37] [34 93]]
Returns a continuous flow running given continuous `flows` in parallel and combining latest value of each flow with given function `f`.

```clojure
(defn sleep-emit [delays]
  (ap (let [n (?> (seed delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (latest vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (reduce conj)))

#_=> [[24 86] [24 12] [79 37] [67 37] [34 93]]
```
sourceraw docstring

mbxclj/s

(mbx)

Creates an instance of mailbox.

A mailbox is a function implementing post on 1-arity and fetch on 2-arity (as task). post immediately pushes given value to mailbox and returns nil. fetch is a task pulling a value from mailbox as soon as it's non-empty and completing with this value.

Cancelling a fetch task makes it fail immediately.

Example : an actor is a mailbox associated with a process consuming messages.

(defn crash [^Throwable e]                                ;; let it crash philosophy
  (.printStackTrace e)
  (System/exit -1))

(defn actor
  ([init] (actor init crash))
  ([init fail]
   (let [self (mbx)]
     ((sp
        (loop [b init]
          (recur (b self (? self)))))
       nil fail)
     self)))

(def counter
  (actor
    ((fn beh [n]
       (fn [self cust]
         (cust n)
         (beh (inc n)))) 0)))

(counter prn)                                             ;; prints 0
(counter prn)                                             ;; prints 1
(counter prn)                                             ;; prints 2
Creates an instance of mailbox.

A mailbox is a function implementing `post` on 1-arity and `fetch` on 2-arity (as task). `post` immediately pushes given value to mailbox and returns nil. `fetch` is a task pulling a value from mailbox as soon as it's non-empty and completing with this value.

Cancelling a `fetch` task makes it fail immediately.

Example : an actor is a mailbox associated with a process consuming messages.
```clojure
(defn crash [^Throwable e]                                ;; let it crash philosophy
  (.printStackTrace e)
  (System/exit -1))

(defn actor
  ([init] (actor init crash))
  ([init fail]
   (let [self (mbx)]
     ((sp
        (loop [b init]
          (recur (b self (? self)))))
       nil fail)
     self)))

(def counter
  (actor
    ((fn beh [n]
       (fn [self cust]
         (cust n)
         (beh (inc n)))) 0)))

(counter prn)                                             ;; prints 0
(counter prn)                                             ;; prints 1
(counter prn)                                             ;; prints 2
```
sourceraw docstring

neverclj/s

source

noneclj/s

The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect.

Example :

(? (reduce conj none))
#_=> []
The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect.

Example :
```clojure
(? (reduce conj none))
#_=> []
```
sourceraw docstring

observeclj/s

(observe s)

Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. subject must be a function taking a 1-arity event function and returning a 0-arity cleanup function.

subject function is called on initialization. cleanup function is called on cancellation. event function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.

Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. `subject` must be a function taking a 1-arity `event` function and returning a 0-arity `cleanup` function.

`subject` function is called on initialization. `cleanup` function is called on cancellation. `event` function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.
sourceraw docstring

publisherclj/s

(publisher flow)

Returns a org.reactivestreams.Publisher running given discrete flow on each subscription.

Returns a `org.reactivestreams.Publisher` running given discrete `flow` on each subscription.
sourceraw docstring

raceclj/s

(race)
(race & ts)

Returns a task running given tasks concurrently.

If any task succeeds, others are cancelled then race completes with this result.

If every task fails, race fails.

Cancelling propagates to children tasks.

Example :

(? (race (sleep 1000 1) (sleep 2000 2)))
#_=> 1                 ;; 1 second later
Returns a task running given `tasks` concurrently.

If any task succeeds, others are cancelled then `race` completes with this result.

If every task fails, `race` fails.

Cancelling propagates to children tasks.

Example :
```clojure
(? (race (sleep 1000 1) (sleep 2000 2)))
#_=> 1                 ;; 1 second later
```
sourceraw docstring

rdvclj/s

(rdv)

Creates an instance of synchronous rendez-vous.

A synchronous rendez-vous is a function implementing give on its 1-arity and take on its 2-arity (as task). give takes a value to be transferred and returns a task completing with nil as soon as a taker is available. take is a task completing with transferred value as soon as a giver is available.

Cancelling give and take tasks makes them fail immediately.

Example : producer / consumer stream communication

(defn reducer [rf i take]
  (sp
    (loop [r i]
      (let [x (? take)]
        (if (identical? x take)
          r (recur (rf r x)))))))

(defn iterator [give xs]
  (sp
    (loop [xs (seq xs)]
      (if-some [[x & xs] xs]
        (do (? (give x))
            (recur xs))
        (? (give give))))))

(def stream (rdv))

(? (join {} (iterator stream (range 100)) (reducer + 0 stream)))      ;; returns 4950
Creates an instance of synchronous rendez-vous.

A synchronous rendez-vous is a function implementing `give` on its 1-arity and `take` on its 2-arity (as task). `give` takes a value to be transferred and returns a task completing with nil as soon as a taker is available. `take` is a task completing with transferred value as soon as a giver is available.

Cancelling `give` and `take` tasks makes them fail immediately.

Example : producer / consumer stream communication
```clojure
(defn reducer [rf i take]
  (sp
    (loop [r i]
      (let [x (? take)]
        (if (identical? x take)
          r (recur (rf r x)))))))

(defn iterator [give xs]
  (sp
    (loop [xs (seq xs)]
      (if-some [[x & xs] xs]
        (do (? (give x))
            (recur xs))
        (? (give give))))))

(def stream (rdv))

(? (join {} (iterator stream (range 100)) (reducer + 0 stream)))      ;; returns 4950
```
sourceraw docstring

reactorclj/smacro

(reactor & body)

Calls reactor-call with a function evaluating given body in an implicit do.

Calls `reactor-call` with a function evaluating given `body` in an implicit `do`.
sourceraw docstring

reactor-callclj/s

(reactor-call i)

Returns a task spawning a reactor context with given boot function, called without argument. A reactor context manages the lifecycle of running flows and serializes their emissions in propagation turns. Flows running in a reactor context are called publishers, they can be spawned in the boot function or in reaction to any subsequent emission with stream! or signal!, respectively for discrete and continuous flows. Publishers of a given reactor context are totally ordered by the union of two partial orders :

  • if a publisher was created by another one, the parent is inferior to the child.
  • if two publishers are siblings, the older is inferior to the younger.

A publisher can subscribe to the feed of an inferior publisher from the same reactor context. Using a publisher as a flow spawns a subscription to this publisher. Publisher emissions are grouped in propagation turns, where successive publishers of a given turn are strictly increasing. When a publisher becomes able to emit, it is compared to the publisher currently emitting to figure out whether the emission must happen on the current turn or on the next one. Cyclic reactions are possible and no attempt is made to prevent them, so care must be taken to ensure the propagation eventually stops. The dispatching mechanism depends on the nature of the publisher, discrete or continuous.

  • a stream is able to emit when its flow is ready to transfer and the backpressure of the previous emission has been collected from all of its subscriptions. On emission, a value is transferred from the flow and subscriptions are notified of the availability of this value. Until the end of the propagation turn, each new subscription to this stream will be immediately notified.
  • a signal always has a current value, thus each new subscription is immediately notified. It is able to emit when its flow is ready to transfer. On emission, its current value is marked as stale and its subscriptions are notified if not already. From this point, a sampling request from any of its subscriptions will trigger the transfer from the flow to refresh the current value. Subsequent sampling requests will reuse this memoized value until next emission.

A subscription terminates when it's cancelled or when the underlying publisher terminates. A publisher can be cancelled, as long as its flow is not terminated, by calling it as a zero-argument function. A cancelled publisher cancels its flow, transfers and discards all of its remaining values without backpressure, and all of its current and future subscriptions fail immediately. Cancelling a reactor cancels all of its publishers, and any subsequent publisher spawning will fail. If any publisher flow fails, or if the boot function throws an exception, the reactor is cancelled. A reactor terminates at the end of the first turn where every publisher flow is terminated, meaning no emission can ever happen anymore. It succeeds with the result of the boot function if no publisher failed, otherwise it fails with the first error encountered.

Returns a task spawning a reactor context with given boot function, called without argument. A reactor context manages
the lifecycle of running flows and serializes their emissions in propagation turns. Flows running in a reactor context
are called publishers, they can be spawned in the boot function or in reaction to any subsequent emission with `stream!`
or `signal!`, respectively for discrete and continuous flows. Publishers of a given reactor context are totally ordered
by the union of two partial orders :
* if a publisher was created by another one, the parent is inferior to the child.
* if two publishers are siblings, the older is inferior to the younger.

A publisher can subscribe to the feed of an inferior publisher from the same reactor context. Using a publisher as a
flow spawns a subscription to this publisher. Publisher emissions are grouped in propagation turns, where successive
publishers of a given turn are strictly increasing. When a publisher becomes able to emit, it is compared to the
publisher currently emitting to figure out whether the emission must happen on the current turn or on the next one.
Cyclic reactions are possible and no attempt is made to prevent them, so care must be taken to ensure the propagation
eventually stops. The dispatching mechanism depends on the nature of the publisher, discrete or continuous.
* a stream is able to emit when its flow is ready to transfer and the backpressure of the previous emission has been
collected from all of its subscriptions. On emission, a value is transferred from the flow and subscriptions are
notified of the availability of this value. Until the end of the propagation turn, each new subscription to this stream
will be immediately notified.
* a signal always has a current value, thus each new subscription is immediately notified. It is able to emit when its
flow is ready to transfer. On emission, its current value is marked as stale and its subscriptions are notified if not
already. From this point, a sampling request from any of its subscriptions will trigger the transfer from the flow to
refresh the current value. Subsequent sampling requests will reuse this memoized value until next emission.

A subscription terminates when it's cancelled or when the underlying publisher terminates. A publisher can be cancelled,
as long as its flow is not terminated, by calling it as a zero-argument function. A cancelled publisher cancels its
flow, transfers and discards all of its remaining values without backpressure, and all of its current and future
subscriptions fail immediately. Cancelling a reactor cancels all of its publishers, and any subsequent publisher
spawning will fail. If any publisher flow fails, or if the boot function throws an exception, the reactor is cancelled.
A reactor terminates at the end of the first turn where every publisher flow is terminated, meaning no emission can
ever happen anymore. It succeeds with the result of the boot function if no publisher failed, otherwise it fails with
the first error encountered.
sourceraw docstring

reduceclj/s

(reduce rf flow)
(reduce rf i flow)

Returns a task reducing values produced by given discrete flow with rf, starting with init (or, if not provided, the result of calling rf with no argument).

Cancelling propagates to upstream flow. Early termination by rf (via reduced or throwing) cancels upstream flow.

Example :

(? (reduce + (seed (range 10))))
#_=> 45
Returns a task reducing values produced by given discrete `flow` with `rf`, starting with `init` (or, if not provided, the result of calling `rf` with no argument).

Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (reduce + (seed (range 10))))
#_=> 45
```
sourceraw docstring

reductionsclj/s

(reductions rf f)
(reductions rf i f)

Returns a discrete flow running given discrete flow and emitting given init value (or, if not provided, the result of calling rf with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value.

Cancelling propagates to upstream flow. Early termination by rf (via reduced or throwing) cancels upstream flow.

Example :

(? (->> [1 2 3 4 5]
        (seed)
        (reductions +)
        (reduce conj)))
#_=> [0 1 3 6 10 15]
Returns a discrete flow running given discrete `flow` and emitting given `init` value (or, if not provided, the result of calling `rf` with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value.

Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (->> [1 2 3 4 5]
        (seed)
        (reductions +)
        (reduce conj)))
#_=> [0 1 3 6 10 15]
```
sourceraw docstring

relieveclj/s

(relieve rf f)

Returns a continuous flow producing values emitted by given discrete flow, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function rf.

Cancelling propagates to upstream. If rf throws, upstream flow is cancelled.

Example :

;; Delays each `input` value by `delay` milliseconds
(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (ap (let [n (?> (seed [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (relieve +)
        (delay-each 80)
        (reduce conj)))
#_=> [24 79 67 61 99 37]
Returns a continuous flow producing values emitted by given discrete `flow`, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function `rf`.

Cancelling propagates to upstream. If `rf` throws, upstream `flow` is cancelled.

Example :
```clojure
;; Delays each `input` value by `delay` milliseconds
(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (ap (let [n (?> (seed [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (relieve +)
        (delay-each 80)
        (reduce conj)))
#_=> [24 79 67 61 99 37]
```
sourceraw docstring

sampleclj/s

(sample f sd sr)

Returns a discrete flow running given sampler discrete flow and sampled continuous flow in parallel. For each sampler value, emits the result of function f called with current values of sampled and sampler.

Cancellation propagates to both flows. When sampler terminates, sampled is cancelled. A failure in any of both flows, or f throwing an exception, or trying to pull a value before first value of sampled will cancel the flow and propagate the error.

Example :

(defn sleep-emit [delays]
  (ap (let [n (?> (seed delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (sample vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (reduce conj)))

#_=> [[24 86] [24 12] [79 37] [67 93]]
Returns a discrete flow running given `sampler` discrete flow and `sampled` continuous flow in parallel. For each `sampler` value, emits the result of function `f` called with current values of `sampled` and `sampler`.

Cancellation propagates to both flows. When `sampler` terminates, `sampled` is cancelled. A failure in any of both flows, or `f` throwing an exception, or trying to pull a value before first value of `sampled` will cancel the flow and propagate the error.

Example :
```clojure
(defn sleep-emit [delays]
  (ap (let [n (?> (seed delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?> input)))))

(? (->> (sample vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (reduce conj)))

#_=> [[24 86] [24 12] [79 37] [67 93]]
```
sourceraw docstring

seedclj/s

(seed coll)

Returns a discrete flow producing values from given collection. Cancelling before having reached the end makes the flow fail immediately.

Returns a discrete flow producing values from given `collection`. Cancelling before having reached the end makes the flow fail immediately.
sourceraw docstring

semclj/s

(sem)
(sem n)

Creates a semaphore initialized with n tokens (1 if not provided, aka mutex).

A semaphore is a function implementing release on 0-arity and acquire on 2-arity (as task). release immediately makes a token available and returns nil. acquire is a task completing with nil as soon as a token is available.

Cancelling an acquire task makes it fail immediately.

Example : dining philosophers

(defn phil [name f1 f2]
  (sp
    (while true
      (prn name :thinking)
      (? (sleep 500))
      (holding f1
        (holding f2
          (prn name :eating)
          (? (sleep 600)))))))

(def forks (vec (repeatedly 5 sem)))

(? (timeout 10000
     (join vector
       (phil "descartes" (forks 0) (forks 1))
       (phil "hume"      (forks 1) (forks 2))
       (phil "plato"     (forks 2) (forks 3))
       (phil "nietzsche" (forks 3) (forks 4))
       (phil "kant"      (forks 0) (forks 4)))))
Creates a semaphore initialized with n tokens (1 if not provided, aka mutex).

A semaphore is a function implementing `release` on 0-arity and `acquire` on 2-arity (as task). `release` immediately makes a token available and returns nil. `acquire` is a task completing with nil as soon as a token is available.

Cancelling an `acquire` task makes it fail immediately.

Example : dining philosophers
```clojure
(defn phil [name f1 f2]
  (sp
    (while true
      (prn name :thinking)
      (? (sleep 500))
      (holding f1
        (holding f2
          (prn name :eating)
          (? (sleep 600)))))))

(def forks (vec (repeatedly 5 sem)))

(? (timeout 10000
     (join vector
       (phil "descartes" (forks 0) (forks 1))
       (phil "hume"      (forks 1) (forks 2))
       (phil "plato"     (forks 2) (forks 3))
       (phil "nietzsche" (forks 3) (forks 4))
       (phil "kant"      (forks 0) (forks 4)))))
```
sourceraw docstring

signal!clj/s

(signal! f)

Spawns a continuous publisher from given flow, see reactor-call.

Spawns a continuous publisher from given flow, see `reactor-call`.
sourceraw docstring

sleepclj/s

(sleep d)
(sleep d x)

Returns a task completing with given value (nil if not provided) after given duration (in milliseconds).

Cancelling a sleep task makes it fail immediately.

Example :

(? (sleep 1000 42))
#_=> 42               ;; 1 second later
Returns a task completing with given value (nil if not provided) after given duration (in milliseconds).

Cancelling a sleep task makes it fail immediately.

Example :
```clojure
(? (sleep 1000 42))
#_=> 42               ;; 1 second later
```
sourceraw docstring

spclj/smacro

(sp & body)

Returns a task evaluating body (in an implicit do). Body evaluation can be parked with ?.

Cancelling an sp task triggers cancellation of the task it's currently running, along with all tasks subsequently run.

Returns a task evaluating `body` (in an implicit `do`). Body evaluation can be parked with `?`.

Cancelling an `sp` task triggers cancellation of the task it's currently running, along with all tasks subsequently run.
sourceraw docstring

stream!clj/s

(stream! f)

Spawns a discrete publisher from given flow, see reactor-call.

Spawns a discrete publisher from given flow, see `reactor-call`.
sourceraw docstring

subscribeclj/s

(subscribe pub)

Returns a discrete flow subscribing to given org.reactivestreams.Publisher.

Returns a discrete flow subscribing to given `org.reactivestreams.Publisher`.
sourceraw docstring

timeoutclj/s

(timeout delay task)

Returns a task running given task and cancelling it if not completed within given delay (in milliseconds).

(? (timeout 100 (sleep (rand-int 200))))
#_=> nil       ;; or exception, after 100 milliseconds
Returns a task running given `task` and cancelling it if not completed within given `delay` (in milliseconds).

```clojure
(? (timeout 100 (sleep (rand-int 200))))
#_=> nil       ;; or exception, after 100 milliseconds
```
sourceraw docstring

transformclj/sdeprecated

Alias for eduction

Alias for `eduction`
sourceraw docstring

viaclj/smacro

(via exec & body)

Returns a task evaluating body (in an implicit do) on given java.util.concurrent.Executor and completing with its result.

Cancellation interrupts the evaluating thread.

Not supported on clojurescript.

Example :


Returns a task evaluating body (in an implicit `do`) on given `java.util.concurrent.Executor` and completing with its result.

Cancellation interrupts the evaluating thread.

Not supported on clojurescript.

Example :
```clojure

```
sourceraw docstring

via-callclj/s

(via-call e t)

Same as via, except the expression to evaluate is provided as a zero-arity function on second argument.

Not supported on clojurescript.

(? (via-call blk read-line))
;; reads a line from stdin and returns it
Same as `via`, except the expression to evaluate is provided as a zero-arity function on second argument.

Not supported on clojurescript.

```clojure
(? (via-call blk read-line))
;; reads a line from stdin and returns it
```
sourceraw docstring

watchclj/s

(watch r)

Returns a continuous flow producing successive values of given reference until cancelled. Given reference must support add-watch, remove-watch and deref. Oldest values are discarded on overflow.

Returns a continuous flow producing successive values of given `reference` until cancelled. Given reference must support `add-watch`, `remove-watch` and `deref`. Oldest values are discarded on overflow.
sourceraw docstring

zipclj/s

(zip c f & fs)

Returns a discrete flow running given discrete flows is parallel and emitting the result of applying f to the set of first values emitted by each upstream flow, followed by the result of applying f to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled.

Cancelling propagates to every upstream flow. If any upstream flow fails or if f throws, the flow is cancelled.

Example :

(m/? (->> (m/zip vector
                 (m/seed [1 2 3])
                 (m/seed [:a :b :c]))
          (m/reduce conj)))
#_=> [[1 :a] [2 :b] [3 :c]]
Returns a discrete flow running given discrete `flows` is parallel and emitting the result of applying `f` to the set of first values emitted by each upstream flow, followed by the result of applying `f` to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled.

Cancelling propagates to every upstream flow. If any upstream flow fails or if `f` throws, the flow is cancelled.

Example :
```clojure
(m/? (->> (m/zip vector
                 (m/seed [1 2 3])
                 (m/seed [:a :b :c]))
          (m/reduce conj)))
#_=> [[1 :a] [2 :b] [3 :c]]
```
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close