Liking cljdoc? Tell your friends :D

Transducers revisited

In this guide, we'll reimplement clj-conduit using cloroutine. If you haven't already, I strongly recommend to have a look at the library and the associated post before going further. A basic understanding of the anatomy of a transducer is required as well.

The idea of conduit is that custom transducers could be much easier to write with a dedicated syntax able to emulate waiting on input. Basically, we want to be able to define a transducer as a block of code in which we would be allowed to use special operators to produce (yield) and consume (await) values in imperative style.

(require '[cloroutine.core :refer [cr]])

First, let's define some dynamic vars to hold thread-local context available during evaluation of conduit blocks.

  • *acc* is the accumulator of the reducing process.
  • *down* is the downstream reducing function
  • *input* is the upstream value currently processed.
(def ^:dynamic *acc*)
(def ^:dynamic *down*)
(def ^:dynamic *input*)

We can now define the functions in charge of I/O in conduits.

  • yielding an output value is a synchronous operation, all we have to do is to reduce the accumulator with given value and check for early termination.
  • awaiting an input is more tricky because it requires to suspend the process to release control to the transducing context. The function takes as argument a sentinel value to return in case of end-of-stream, when the process requires a finalization step. Otherwise the function can be called with zero argument and the process will be resumed only on available input.
  • input is the resume function associated with await, its sole job is to retrieve input value from context.
(defn yield [x]
  (not (reduced? (set! *acc* (*down* *acc* x)))))

(defn await
  ([] (await ::done))
  ([eos] eos))

(defn input []
  *input*)

Now, let's define the function conduit-xf building the transducer itself. The conduit process is wrapped in a coroutine, itself wrapped in a constructor function. The coroutine is instanciated on transducing context initialization, along with a mutable box to keep track of the end-of-stream sentinel value between two successive steps.

(defn conduit-xf [ctor]
  (fn [down]
    (let [cor (ctor)
          eos (volatile! (cor))]
      (fn rf
        ([]
         (down))
        ([acc]
         (down (case @eos
                 ::done acc
                 (rf acc @eos))))
        ([acc x]
         (binding [*acc*   acc
                   *down*  down
                   *input* x]
           (vreset! eos (cor))
           *acc*))))))

The conduit macro now simply consists of wrapping a body in a coroutine, ensuring early termination of the reduction process when done.

(defmacro conduit [& body]
  `(conduit-xf
     #(cr {await input}
        ~@body (set! *acc* (ensure-reduced *acc*)) ::done)))

Additionally, we can define syntactic sugar on top of await, allowing to branch on end-of-stream without having to provide and test against a sentinel.

(defmacro if-let-await [sym then else]
  `(let [x# (await ::over)]
     (case x#
       ::over ~else
       (let [~sym x#] ~then))))

As an example of usage, here are the reimplementations of clojure.core's transducers, stolen from clj-conduit.

(defn mapping [f]
  (conduit
    (while true
      (yield (f (await))))))

(defn mapping-indexed [f]
  (conduit
    (loop [i 0]
      (yield (f i (await)))
      (recur (inc i)))))

(defn filtering [pred]
  (conduit
    (while true
      (let [val (await)]
        (when (pred val)
          (yield val))))))

(defn taking-while [pred]
  (conduit
    (loop []
      (let [val (await)]
        (when (pred val)
          (yield val)
          (recur))))))

(defn taking [n]
  (conduit
    (dotimes [_ n]
      (yield (await)))))

(defn taking-nth [n]
  (conduit
    (while true
      (yield (await))
      (dotimes [_ (- n 1)]
        (await)))))

(defn dropping [n]
  (conduit
    (dotimes [_ n]
      (await))
    (while true
      (yield (await)))))

(defn dropping-while [pred]
  (conduit
    (loop [v (await)]
      (if (pred v)
        (recur (await))
        (yield v)))
    (while true
      (yield (await)))))

(def catting
  (conduit
    (while true
      (doseq [val (await)]
        (yield val)))))

(defn mapcatting [f]
  (conduit
    (while true
      (doseq [val (f (await))]
        (yield val)))))

(def deduping
  (conduit
    (loop [old ::none]
      (let [new (await)]
        (when-not (= old new)
          (yield new))
        (recur new)))))

(defn replacing [smap]
  (conduit
    (while true
      (let [val (await)]
        (yield (get smap val val))))))

(defn keeping [f]
  (conduit
    (while true
      (let [v (f (await))]
        (when-not (nil? v)
          (yield v))))))

(defn keeping-indexed [f]
  (conduit
    (loop [i 0]
      (let [v (f i (await))]
        (when-not (nil? v)
          (yield v)))
      (recur (inc i)))))

(def distincting
  (conduit
    (loop [seen #{}]
      (let [val (await)]
        (if-not (contains? seen val)
          (do (yield val)
              (recur (conj seen val)))
          (recur seen))))))

(defn random-sampling [prob]
  (conduit
    (while true
      (let [val (await)]
        (when (< (rand) prob)
          (yield val))))))

(defn interposing [sep]
  (conduit
    (yield (await))
    (while true
      (let [val (await)]
        (yield sep)
        (yield val)))))

(defn partitioning-all [n]
  (conduit
    (loop [vs [(await)]]
      (if (= n (count vs))
        (do (yield vs)
            (recur [(await)]))
        (if-let-await v
          (recur (conj vs v))
          (yield vs))))))

(defn partitioning-by [f]
  (conduit
    (let [first-val (await)]
      (loop [vs     [first-val]
             to-cmp (f first-val)]
        (if-let-await v
          (let [new-to-cmp (f v)]
            (if (= to-cmp new-to-cmp)
              (recur (conj vs v) to-cmp)
              (do (yield vs)
                  (recur [v] new-to-cmp))))
          (yield vs))))))

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close