This tutorial will help you familiarize with the
flow abstraction. A
flow is a value representing a process able to produce an arbitrary number of values before terminating. Like
tasks, they're asynchronous under the hood and also support failure and graceful shutdown.
Setup a dependency on the latest
missionary release in your favorite environment and fire up a clojure REPL. Require
missionary's single namespace
missionary.core, which will be aliased to
m in the following.
(require '[missionary.core :as m])
You can build a flow from an arbitrary collection with
seed, and you can reduce
flows like collections with
reduce, turning it into a
;; A flow producing the 10 first integers
(def input (m/seed (range 10)))
;; A task producing the sum of the 10 first integers
(def sum (m/reduce + input))
eduction passes a flow through a transducer.
(m/? (m/reduce conj (m/eduction (partition-all 4) input)))
#_=> [[0 1 2 3] [4 5 6 7] [8 9]]
Not very interesting so far, because we didn't perform any action yet. Let's introduce the
ap is to
sp is to
sp, it can be parked with
?, but it has an additional superpower : it can be forked.
(println (m/?> (m/seed ["Hello" "World" "!"])))
(m/? (m/sleep 1000))))
(m/? (m/reduce conj hello-world))
#_=> [nil nil nil]
?> operator pulls the first seeded value, forks evaluation and moves on until end of body, producing result
nil, then backtracks evaluation to the fork point, pulls another value, forks evaluation again, and so on until enumeration is exhausted. Meanwhile,
reduce consolidates each result into a vector. In an
ap block, expressions have more than one possible value, that's why they're called ambiguous process.
In the previous example, pulling a value from the flow passed to
?> transfers evaluation control to the forked process, and waited evaluation to be completed before pulling another value from the flow. In some cases though, we want the flow to keep priority over the forked process, so it can be shutdowned when more values become available. That kind of forking is implemented by
We can use it to implement debounce operators. A debounced flow is a flow emitting only values that are not followed by another one within a given delay.
(defn debounce [delay flow]
(m/ap (let [x (m/?< flow)] ;; pull a value preemptively
(try (m/? (m/sleep delay x)) ;; emit this value after given delay
(catch Cancelled _ (m/amb>)))))) ;; emit nothing if cancelled
To test it, we need a flow of values emitting at various intervals.
(defn clock [intervals]
(m/ap (let [i (m/?> (m/seed intervals))]
(m/? (m/sleep i i)))))
(m/? (->> (clock [24 79 67 34 18 9 99 37])
#_=> [24 79 9 37]
What if we want to fork the processes concurrently? Enter the
?= operator. It forks evaluation for all values concurrently. Values are returned from the flow in the order they finish, which is not necessarily the initial order.
(m/? (m/reduce conj (m/ap (let [ms (m/?= (m/seed [300 100 400 200]))]
(m/? (m/sleep ms ms))))))
#_=> [100 200 300 400]