Liking cljdoc? Tell your friends :D

missionary.core


!clj/s

(!)

Throws an exception if current computation is required to terminate, otherwise returns nil.

Inside a process block, checks process cancellation status.

Outside of process blocks, fallbacks to thread interruption status if host platform supports it.

Throws an exception if current computation is required to terminate, otherwise returns nil.

Inside a process block, checks process cancellation status.

Outside of process blocks, fallbacks to thread interruption status if host platform supports it.
sourceraw docstring

?clj/s

(? t)

Runs given task, waits for completion and returns result (or throws, if task fails).

Inside a process block, parks the process.

Outside of process blocks, fallbacks to thread blocking if host platform supports it.

Runs given task, waits for completion and returns result (or throws, if task fails).

Inside a process block, parks the process.

Outside of process blocks, fallbacks to thread blocking if host platform supports it.
sourceraw docstring

?!clj/s

(?! f)

In an ambiguous process block, runs given flow preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if flow emits another value before it terminates.

Example :

(defn debounce [delay flow]
  (ap (let [x (?! flow)]
    (try (? (sleep delay x))
         (catch Exception _ (?? none))))))

(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (debounce 50)
        (aggregate conj)))

#_=> [24 79 9 37]

In an ambiguous process block, runs given `flow` preemptively (aka switch), forking process for each emitted value. Forked process is cancelled if `flow` emits another value before it terminates.

Example :
```clojure
(defn debounce [delay flow]
  (ap (let [x (?! flow)]
    (try (? (sleep delay x))
         (catch Exception _ (?? none))))))

(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (debounce 50)
        (aggregate conj)))
```
#_=> [24 79 9 37]
sourceraw docstring

??clj/s

(?? f)

In an ambiguous process block, runs given flow non-preemptively (aka concat), forking process for each emitted value.

Example :

(? (aggregate conj (ap (inc (?? (enumerate [1 2 3]))))))
#_=> [2 3 4]
In an ambiguous process block, runs given `flow` non-preemptively (aka concat), forking process for each emitted value.

Example :
```clojure
(? (aggregate conj (ap (inc (?? (enumerate [1 2 3]))))))
#_=> [2 3 4]
```
sourceraw docstring

absolveclj/s

(absolve task)

Returns a task running given task completing with a zero-argument function and completing with the result of this function call.

Returns a task running given `task` completing with a zero-argument function and completing with the result of this function call.
sourceraw docstring

aggregateclj/s

(aggregate rf flow)
(aggregate rf i flow)

Returns a task reducing values produced by given discrete flow with rf, starting with init (or, if not provided, the result of calling rf with no argument).

Cancelling propagates to upstream flow. Early termination by rf (via reduced or throwing) cancels upstream flow.

Example :

(? (aggregate + (enumerate (range 10))))
#_=> 45
Returns a task reducing values produced by given discrete `flow` with `rf`, starting with `init` (or, if not provided, the result of calling `rf` with no argument).

Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (aggregate + (enumerate (range 10))))
#_=> 45
```
sourceraw docstring

apclj/smacro

(ap & body)

Returns a flow evaluating body (in an implicit do) and producing values of each subsequent fork. Body evaluation can be parked with ? and forked with ?? and ?!.

Cancelling an ap flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.

Returns a flow evaluating `body` (in an implicit `do`) and producing values of each subsequent fork. Body evaluation can be parked with `?` and forked with `??` and `?!`.

Cancelling an `ap` flow triggers cancellation of the task/flow it's currently running, along with all tasks/flows subsequently run.
source (clj)source (cljs)raw docstring

attemptclj/s

(attempt task)

Returns a task always succeeding with result of given task wrapped in a zero-argument function returning result if successful or throwing exception if failed.

Returns a task always succeeding with result of given `task` wrapped in a zero-argument function returning result if successful or throwing exception if failed.
sourceraw docstring

blkclj/s

A java.util.concurrent.Executor optimized for blocking evaluation.

A `java.util.concurrent.Executor` optimized for blocking evaluation.
sourceraw docstring

bufferclj/s

(buffer c f)

Returns a discrete flow producing values emitted by given discrete flow, accumulating upstream overflow up to capacity items.

Returns a discrete flow producing values emitted by given discrete `flow`, accumulating upstream overflow up to `capacity` items.
sourceraw docstring

compelclj/s

(compel task)

Inhibits cancellation signal of given task.

Inhibits cancellation signal of given `task`.
sourceraw docstring

cpuclj/s

A java.util.concurrent.Executor optimized for non-blocking evaluation.

A `java.util.concurrent.Executor` optimized for non-blocking evaluation.
sourceraw docstring

dfvclj/s

(dfv)

Creates an instance of dataflow variable (aka single-assignment).

A dataflow variable is a function implementing assign on 1-arity and deref on 2-arity (as task). assign immediately binds the variable to given value if not already bound and returns bound value. deref is a task completing with the value bound to the variable as soon as it's available.

Cancelling a deref task makes it fail immediately.

Creates an instance of dataflow variable (aka single-assignment).

A dataflow variable is a function implementing `assign` on 1-arity and `deref` on 2-arity (as task). `assign` immediately binds the variable to given value if not already bound and returns bound value. `deref` is a task completing with the value bound to the variable as soon as it's available.

Cancelling a `deref` task makes it fail immediately.
```
sourceraw docstring

enumerateclj/s

(enumerate coll)

Returns a discrete flow producing values from given collection. Cancelling before having reached the end makes the flow fail immediately.

Returns a discrete flow producing values from given `collection`. Cancelling before having reached the end makes the flow fail immediately.
sourceraw docstring

gatherclj/s

(gather)
(gather f)
(gather f & fs)

Returns a discrete flow running given discrete flows in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated.

Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled.

Example :

(? (->> (gather (enumerate [1 2 3])
                (enumerate [:a :b :c]))
        (aggregate conj)))
#_=> [1 :a 2 :b 3 :c]
Returns a discrete flow running given discrete `flows` in parallel and emitting upstream values unchanged, as soon as they're available, until every upstream flow is terminated.

Cancelling propagates to every upstream flow. If any upstream flow fails, the flow is cancelled.

Example :
```clojure
(? (->> (gather (enumerate [1 2 3])
                (enumerate [:a :b :c]))
        (aggregate conj)))
#_=> [1 :a 2 :b 3 :c]
```
sourceraw docstring

holdingclj/smacro

(holding lock & body)

acquires given semaphore and evaluates body (in an implicit do), ensuring semaphore is released after evaluation.

`acquire`s given `semaphore` and evaluates `body` (in an implicit `do`), ensuring `semaphore` is `release`d after evaluation.
source (clj)source (cljs)raw docstring

integrateclj/s

(integrate rf f)
(integrate rf i f)

Returns a discrete flow running given discrete flow and emitting given init value (or, if not provided, the result of calling rf with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value.

Cancelling propagates to upstream flow. Early termination by rf (via reduced or throwing) cancels upstream flow.

Example :

(? (->> [1 2 3 4 5]
        (enumerate)
        (integrate +)
        (aggregate conj)))
#_=> [0 1 3 6 10 15]
Returns a discrete flow running given discrete `flow` and emitting given `init` value (or, if not provided, the result of calling `rf` with no argument) followed by successive reductions (by rf) of upstream values with previously emitted value.

Cancelling propagates to upstream flow. Early termination by `rf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (->> [1 2 3 4 5]
        (enumerate)
        (integrate +)
        (aggregate conj)))
#_=> [0 1 3 6 10 15]
```
sourceraw docstring

joinclj/s

(join c)
(join c & ts)

Returns a task running given tasks concurrently.

If every task succeeds, join completes with the result of applying f to these results.

If any task fails, others are cancelled then join fails with this error.

Cancelling propagates to children tasks.

Example :

(? (join vector (sleep 1000 1) (sleep 1000 2)))
#_=> [1 2]            ;; 1 second later
Returns a task running given `tasks` concurrently.

If every task succeeds, `join` completes with the result of applying `f` to these results.

If any task fails, others are cancelled then `join` fails with this error.

Cancelling propagates to children tasks.

Example :
```clojure
(? (join vector (sleep 1000 1) (sleep 1000 2)))
#_=> [1 2]            ;; 1 second later
```
sourceraw docstring

latestclj/s

(latest f)
(latest f & fs)

Returns a continuous flow running given continuous flows in parallel and combining latest value of each flow with given function f.

(defn sleep-emit [delays]
  (ap (let [n (?? (enumerate delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (latest vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (aggregate conj)))

#_=> [[24 86] [24 12] [79 37] [67 37] [34 93]]
Returns a continuous flow running given continuous `flows` in parallel and combining latest value of each flow with given function `f`.

```clojure
(defn sleep-emit [delays]
  (ap (let [n (?? (enumerate delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (latest vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (aggregate conj)))

#_=> [[24 86] [24 12] [79 37] [67 37] [34 93]]
```
sourceraw docstring

mbxclj/s

(mbx)

Creates an instance of mailbox.

A mailbox is a function implementing post on 1-arity and fetch on 2-arity (as task). post immediately pushes given value to mailbox and returns nil. fetch is a task pulling a value from mailbox as soon as it's non-empty and completing with this value.

Cancelling a fetch task makes it fail immediately.

Example : an actor is a mailbox associated with a process consuming messages.

(defn crash [^Throwable e]                                ;; let it crash philosophy
  (.printStackTrace e)
  (System/exit -1))

(defn actor
  ([init] (actor init crash))
  ([init fail]
   (let [self (mbx)]
     ((sp
        (loop [b init]
          (recur (b self (? self)))))
       nil fail)
     self)))

(def counter
  (actor
    ((fn beh [n]
       (fn [self cust]
         (cust n)
         (beh (inc n)))) 0)))

(counter prn)                                             ;; prints 0
(counter prn)                                             ;; prints 1
(counter prn)                                             ;; prints 2
Creates an instance of mailbox.

A mailbox is a function implementing `post` on 1-arity and `fetch` on 2-arity (as task). `post` immediately pushes given value to mailbox and returns nil. `fetch` is a task pulling a value from mailbox as soon as it's non-empty and completing with this value.

Cancelling a `fetch` task makes it fail immediately.

Example : an actor is a mailbox associated with a process consuming messages.
```clojure
(defn crash [^Throwable e]                                ;; let it crash philosophy
  (.printStackTrace e)
  (System/exit -1))

(defn actor
  ([init] (actor init crash))
  ([init fail]
   (let [self (mbx)]
     ((sp
        (loop [b init]
          (recur (b self (? self)))))
       nil fail)
     self)))

(def counter
  (actor
    ((fn beh [n]
       (fn [self cust]
         (cust n)
         (beh (inc n)))) 0)))

(counter prn)                                             ;; prints 0
(counter prn)                                             ;; prints 1
(counter prn)                                             ;; prints 2
```
sourceraw docstring

neverclj/s

source

noneclj/s

The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect.

Example :

(? (aggregate conj none))
#_=> []
The empty flow. Doesn't produce any value and terminates immediately. Cancelling has no effect.

Example :
```clojure
(? (aggregate conj none))
#_=> []
```
sourceraw docstring

observeclj/s

(observe s)

Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. subject must be a function taking a 1-arity event function and returning a 0-arity cleanup function.

subject function is called on initialization. cleanup function is called on cancellation. event function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.

Returns a discrete flow observing values produced by a non-backpressured subject until cancelled. `subject` must be a function taking a 1-arity `event` function and returning a 0-arity `cleanup` function.

`subject` function is called on initialization. `cleanup` function is called on cancellation. `event` function may be called at any time, it throws an exception on overflow and becomes a no-op after cancellation.
sourceraw docstring

publisherclj/s

(publisher flow)

Returns a org.reactivestreams.Publisher running given discrete flow on each subscription.

Returns a `org.reactivestreams.Publisher` running given discrete `flow` on each subscription.
sourceraw docstring

raceclj/s

(race)
(race & ts)

Returns a task running given tasks concurrently.

If any task succeeds, others are cancelled then race completes with this result.

If every task fails, race fails.

Cancelling propagates to children tasks.

Example :

(? (race (sleep 1000 1) (sleep 2000 2)))
#_=> 1                 ;; 1 second later
Returns a task running given `tasks` concurrently.

If any task succeeds, others are cancelled then `race` completes with this result.

If every task fails, `race` fails.

Cancelling propagates to children tasks.

Example :
```clojure
(? (race (sleep 1000 1) (sleep 2000 2)))
#_=> 1                 ;; 1 second later
```
sourceraw docstring

rdvclj/s

(rdv)

Creates an instance of synchronous rendez-vous.

A synchronous rendez-vous is a function implementing give on its 1-arity and take on its 2-arity (as task). give takes a value to be transferred and returns a task completing with nil as soon as a taker is available. take is a task completing with transferred value as soon as a giver is available.

Cancelling give and take tasks makes them fail immediately.

Example : producer / consumer stream communication

(defn reducer [rf i take]
  (sp
    (loop [r i]
      (let [x (? take)]
        (if (identical? x take)
          r (recur (rf r x)))))))

(defn iterator [give xs]
  (sp
    (loop [xs (seq xs)]
      (if-some [[x & xs] xs]
        (do (? (give x))
            (recur xs))
        (? (give give))))))

(def stream (rdv))

(? (join {} (iterator stream (range 100)) (reducer + 0 stream)))      ;; returns 4950
Creates an instance of synchronous rendez-vous.

A synchronous rendez-vous is a function implementing `give` on its 1-arity and `take` on its 2-arity (as task). `give` takes a value to be transferred and returns a task completing with nil as soon as a taker is available. `take` is a task completing with transferred value as soon as a giver is available.

Cancelling `give` and `take` tasks makes them fail immediately.

Example : producer / consumer stream communication
```clojure
(defn reducer [rf i take]
  (sp
    (loop [r i]
      (let [x (? take)]
        (if (identical? x take)
          r (recur (rf r x)))))))

(defn iterator [give xs]
  (sp
    (loop [xs (seq xs)]
      (if-some [[x & xs] xs]
        (do (? (give x))
            (recur xs))
        (? (give give))))))

(def stream (rdv))

(? (join {} (iterator stream (range 100)) (reducer + 0 stream)))      ;; returns 4950
```
sourceraw docstring

relieveclj/s

(relieve rf f)

Returns a continuous flow producing values emitted by given discrete flow, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function rf.

Cancelling propagates to upstream. If rf throws, upstream flow is cancelled.

Example :

;; Delays each `input` value by `delay` milliseconds
(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (relieve +)
        (delay-each 80)
        (aggregate conj)))
#_=> [24 79 67 61 99 37]
Returns a continuous flow producing values emitted by given discrete `flow`, relieving backpressure. When upstream is faster than downstream, overflowed values are successively reduced with given function `rf`.

Cancelling propagates to upstream. If `rf` throws, upstream `flow` is cancelled.

Example :
```clojure
;; Delays each `input` value by `delay` milliseconds
(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (ap (let [n (?? (enumerate [24 79 67 34 18 9 99 37]))]
              (? (sleep n n))))
        (relieve +)
        (delay-each 80)
        (aggregate conj)))
#_=> [24 79 67 61 99 37]
```
sourceraw docstring

sampleclj/s

(sample f sd sr)

Returns a discrete flow running given sampler discrete flow and sampled continuous flow in parallel. For each sampler value, emits the result of function f called with current values of sampled and sampler.

Cancellation propagates to both flows. When sampler terminates, sampled is cancelled. A failure in any of both flows, or f throwing an exception, or trying to pull a value before first value of sampled will cancel the flow and propagate the error.

Example :

(defn sleep-emit [delays]
  (ap (let [n (?? (enumerate delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (sample vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (aggregate conj)))

#_=> [[24 86] [24 12] [79 37] [67 93]]
Returns a discrete flow running given `sampler` discrete flow and `sampled` continuous flow in parallel. For each `sampler` value, emits the result of function `f` called with current values of `sampled` and `sampler`.

Cancellation propagates to both flows. When `sampler` terminates, `sampled` is cancelled. A failure in any of both flows, or `f` throwing an exception, or trying to pull a value before first value of `sampled` will cancel the flow and propagate the error.

Example :
```clojure
(defn sleep-emit [delays]
  (ap (let [n (?? (enumerate delays))]
        (? (sleep n n)))))

(defn delay-each [delay input]
  (ap (? (sleep delay (?? input)))))

(? (->> (sample vector
                (sleep-emit [24 79 67 34])
                (sleep-emit [86 12 37 93]))
        (delay-each 50)
        (aggregate conj)))

#_=> [[24 86] [24 12] [79 37] [67 93]]
```
sourceraw docstring

semclj/s

(sem)
(sem n)

Creates a semaphore initialized with n tokens (1 if not provided, aka mutex).

A semaphore is a function implementing release on 0-arity and acquire on 2-arity (as task). release immediately makes a token available and returns nil. acquire is a task completing with nil as soon as a token is available.

Cancelling an acquire task makes it fail immediately.

Example : dining philosophers

(defn phil [name f1 f2]
  (sp
    (while true
      (prn name :thinking)
      (? (sleep 500))
      (holding f1
        (holding f2
          (prn name :eating)
          (? (sleep 600)))))))

(def forks (vec (repeatedly 5 sem)))

(? (timeout 10000
     (join vector
       (phil "descartes" (forks 0) (forks 1))
       (phil "hume"      (forks 1) (forks 2))
       (phil "plato"     (forks 2) (forks 3))
       (phil "nietzsche" (forks 3) (forks 4))
       (phil "kant"      (forks 0) (forks 4)))))
Creates a semaphore initialized with n tokens (1 if not provided, aka mutex).

A semaphore is a function implementing `release` on 0-arity and `acquire` on 2-arity (as task). `release` immediately makes a token available and returns nil. `acquire` is a task completing with nil as soon as a token is available.

Cancelling an `acquire` task makes it fail immediately.

Example : dining philosophers
```clojure
(defn phil [name f1 f2]
  (sp
    (while true
      (prn name :thinking)
      (? (sleep 500))
      (holding f1
        (holding f2
          (prn name :eating)
          (? (sleep 600)))))))

(def forks (vec (repeatedly 5 sem)))

(? (timeout 10000
     (join vector
       (phil "descartes" (forks 0) (forks 1))
       (phil "hume"      (forks 1) (forks 2))
       (phil "plato"     (forks 2) (forks 3))
       (phil "nietzsche" (forks 3) (forks 4))
       (phil "kant"      (forks 0) (forks 4)))))
```
sourceraw docstring

sleepclj/s

(sleep d)
(sleep d x)

Returns a task completing with given value (nil if not provided) after given duration (in milliseconds).

Cancelling a sleep task makes it fail immediately.

Example :

(? (sleep 1000 42))
#_=> 42               ;; 1 second later
Returns a task completing with given value (nil if not provided) after given duration (in milliseconds).

Cancelling a sleep task makes it fail immediately.

Example :
```clojure
(? (sleep 1000 42))
#_=> 42               ;; 1 second later
```
sourceraw docstring

spclj/smacro

(sp & body)

Returns a task evaluating body (in an implicit do). Body evaluation can be parked with ?.

Cancelling an sp task triggers cancellation of the task it's currently running, along with all tasks subsequently run.

Returns a task evaluating `body` (in an implicit `do`). Body evaluation can be parked with `?`.

Cancelling an `sp` task triggers cancellation of the task it's currently running, along with all tasks subsequently run.
source (clj)source (cljs)raw docstring

subscribeclj/s

(subscribe pub)

Returns a discrete flow subscribing to given org.reactivestreams.Publisher.

Returns a discrete flow subscribing to given `org.reactivestreams.Publisher`.
sourceraw docstring

timeoutclj/s

(timeout delay task)

Returns a task running given task and cancelling it if not completed within given delay (in milliseconds).

(? (timeout 100 (sleep (rand-int 200))))
#_=> nil       ;; or exception, after 100 milliseconds
Returns a task running given `task` and cancelling it if not completed within given `delay` (in milliseconds).

```clojure
(? (timeout 100 (sleep (rand-int 200))))
#_=> nil       ;; or exception, after 100 milliseconds
```
sourceraw docstring

transformclj/s

(transform x f)

Returns a discrete flow running given discrete flow and transforming values with given transducer xf.

Cancelling propagates to upstream flow. Early termination by xf (via reduced or throwing) cancels upstream flow.

Example :

(? (->> (enumerate (range 10))
        (transform (comp (filter odd?) (mapcat range) (partition-all 4)))
        (aggregate conj)))
#_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]]
Returns a discrete flow running given discrete `flow` and transforming values with given transducer `xf`.

Cancelling propagates to upstream flow. Early termination by `xf` (via `reduced` or throwing) cancels upstream flow.

Example :
```clojure
(? (->> (enumerate (range 10))
        (transform (comp (filter odd?) (mapcat range) (partition-all 4)))
        (aggregate conj)))
#_=> [[0 0 1 2] [0 1 2 3] [4 0 1 2] [3 4 5 6] [0 1 2 3] [4 5 6 7] [8]]
```
sourceraw docstring

viaclj/smacro

(via exec & body)

Returns a task evaluating body (in an implicit do) on given java.util.concurrent.Executor and completing with its result.

Cancellation interrupts the evaluating thread.

Not supported on clojurescript.

Example :


Returns a task evaluating body (in an implicit `do`) on given `java.util.concurrent.Executor` and completing with its result.

Cancellation interrupts the evaluating thread.

Not supported on clojurescript.

Example :
```clojure

```
source (clj)source (cljs)raw docstring

via-callclj/s

(via-call e t)

Same as via, except the expression to evaluate is provided as a zero-arity function on second argument.

Not supported on clojurescript.

(? (via-call blk read-line))
;; reads a line from stdin and returns it
Same as `via`, except the expression to evaluate is provided as a zero-arity function on second argument.

Not supported on clojurescript.

```clojure
(? (via-call blk read-line))
;; reads a line from stdin and returns it
```
sourceraw docstring

watchclj/s

(watch r)

Returns a continuous flow producing successive values of given reference until cancelled. Given reference must support add-watch, remove-watch and deref. Oldest values are discarded on overflow.

Returns a continuous flow producing successive values of given `reference` until cancelled. Given reference must support `add-watch`, `remove-watch` and `deref`. Oldest values are discarded on overflow.
sourceraw docstring

zipclj/s

(zip c f & fs)

Returns a discrete flow running given discrete flows is parallel and emitting the result of applying f to the set of first values emitted by each upstream flow, followed by the result of applying f to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled.

Cancelling propagates to every upstream flow. If any upstream flow fails or if f throws, the flow is cancelled.

Example :

(m/? (->> (m/zip vector
                 (m/enumerate [1 2 3])
                 (m/enumerate [:a :b :c]))
          (m/aggregate conj)))
#_=> [[1 :a] [2 :b] [3 :c]]
Returns a discrete flow running given discrete `flows` is parallel and emitting the result of applying `f` to the set of first values emitted by each upstream flow, followed by the result of applying `f` to the set of second values and so on, until any upstream flow terminates, at which point the flow will be cancelled.

Cancelling propagates to every upstream flow. If any upstream flow fails or if `f` throws, the flow is cancelled.

Example :
```clojure
(m/? (->> (m/zip vector
                 (m/enumerate [1 2 3])
                 (m/enumerate [:a :b :c]))
          (m/aggregate conj)))
#_=> [[1 :a] [2 :b] [3 :c]]
```
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close