Liking cljdoc? Tell your friends :D

Hello flow

This tutorial will help you familiarize with the flow abstraction. A flow is a value representing a process able to produce an arbitrary number of values before terminating. Like tasks, they're asynchronous under the hood and also support failure and graceful shutdown.

Basic operations

Setup a dependency on the latest missionary release in your favorite environment and fire up a clojure REPL. Require missionary's single namespace missionary.core, which will be aliased to m in the following.

(require '[missionary.core :as m])

You can build a flow from an arbitrary collection with enumerate, and you can reduce flows like collections with aggregate, turning it into a task.

;; A flow producing the 10 first integers
(def input (m/enumerate (range 10)))

;; A task producing the sum of the 10 first integers
(def sum (m/aggregate + input))

(m/? sum)
#_=> 45

transform passes a flow through a transducer.

(m/? (m/aggregate conj (m/transform (partition-all 4) input)))
#_=> [[0 1 2 3] [4 5 6 7] [8 9]]

Ambiguous evaluation

Not very interesting so far, because we didn't perform any action yet. Let's introduce the ap macro. ap is to flows what sp is to tasks. Like sp, it can be parked with ?, but it has an additional superpower : it can be forked.

(def hello-world
  (m/ap
    (println (m/?? (m/enumerate ["Hello" "World" "!"])))
    (m/? (m/sleep 1000))))

(m/? (m/aggregate conj hello-world))
Hello
World
!
#_=> [nil nil nil]              

The ?? operator pulled the first enumerated value, forked evaluation and moved on until end of body, producing result nil, then backtracked evaluation to the fork point, pulled another value, forked evaluation again, and so on until enumeration was exhausted. Meanwhile, aggregate consolidated each result into a vector. In an ap block, expressions have more than one possible value, that's why they're called ambiguous process.

Preemptive forking

In the previous example, pulling a value from the flow passed to ?? transferred evaluation control to forked process, and waited evaluation to be completed before pulling another value from the flow. In some cases though, we want the flow to keep priority over the forked process, so it can be shutdowned when more values become available. That kind of forking is implemented by ?!.

We can use it to implement debounce operators. A debounced flow is a flow emitting only values that are not followed by another one within a given delay.

(defn debounce [delay flow]
  (m/ap (let [x (m/?! flow)]                          ;; pull a value preemptively
    (try (m/? (m/sleep delay x))                      ;; emit this value after given delay
         (catch Exception _ (m/?? m/none))))))        ;; emit nothing if cancelled

To test it, we need a flow of values emitting at various intervals.

(defn clock [intervals]
  (m/ap (let [i (m/?? (m/enumerate intervals))]
          (m/? (m/sleep i i)))))

(m/? (->> (clock [24 79 67 34 18 9 99 37])
          (debounce 50)
          (m/aggregate conj)))
#_=> [24 79 9 37]

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close