river tries to provide an implementation of Oleg's Iteratee library. The terminology was changed in order to offer a more intuitive API for the end user, however if you are familiar with Haskell's Iteratees, this might help you get started with this library:
The library is initally divided in 3 namespaces
river.core
provides:
river.seq
provides:
map\*
, filter\*
, etc.river.io
provides:
InputStream
s.InputStream
s.The code provided in this library is still beta, there is a lot of room for improvement and new features, it serves as a starting point to exploit the Iteratee concepts in Clojure.
[org.van-clj/river "0.1.0"]
To execute a consumer you will use the run
function:
(river.core/run (river.seq/produce-seq (range 1 100))
(river.core/*c (river.seq/filter* #(= 0 (mod % 2)))
(river.seq/take 5)))
; => #river.core/ConsumerDome {:result (2 4 6 8 10), :remainder (12 14)}
The code above streams a seq from 1 to 99, then calls \*c
to bind a filter
to a consumer that takes 5 items from the feed. The result of this execution
will be a ConsumerDone
record that has the yielded result and the
remainder of the given chunks.
The stream is what the consumer receive as an input, the stream could
either be a seq of items (called chunks), or an EOF signal, represented by
river.core/eof
.
The consumer is the one that process the stream, and it will either yield
a result (using river.core/yield
) or a continuation (using
river.core/continue
).
river.core/yield
will be used when the consumer is done consuming from
the stream, two values are returned with the yield, one being the result
value, and the second being the remainder of the stream that wasn't consumed,
this is kept in order to compose several consumers together using the
monadic interface.
river.core/continue
will be used when the consumer hasn't received enough
chunks to yield a result, some consumers might consume part of the stream, some
others would be greedy and consume all the available stream.
The producer generates the stream that the consumer will use, they normally
consume a resource like a lazy-seq
, file, socket, etc; and then transmits it
over to the consumer. The advantage of using producers is that the stream
generation is kept independent from the consumption, so the consumer doesn't
need to know where the data is coming from as long as it is valid.
It stops consuming from the given resource as soon as the consumer returns a yield value.
The filter transforms the stream into something different, it either changes the type of the stream, or modifies the way the input is given.
One of the powerful characteristics of this stream architecture is that both Producers and Filters are also consumers, they receive a consumer as a parameter and they enhance the consumer behavior, they could be perceived as some sort of decorator in the OO world.
Say for example you want to be able to sum a list of numbers, but this numbers may come from different resources, some from stdin, others from a list, etc. To implement this using the river library, you would do something like the following:
(ns river.examples.sum
(require [clojure.string :as string])
(use [river.core])
(require [river.seq :as rs]
[river.io :as rio]))
(def words* (rs/mapcat* #(string/split % #"\s+")))
; ^ a filter that applies a split to whatever it recieves
; in the stream, this is assuming the stream is of strings
(def numbers* (rs/map* #(Integer/parseInt %)))
; ^ a fitler that transforms each received item into an
; Integer using the parseInt function, this is assuming
; that the stream is of strings
(defn produce-numbers-from-file
([] (produce-numbers-from-file "input.in"))
([file-path]
(p*
; ^ whe bind a producer with some filters
; using the p* function.
(rio/produce-file-lines file-path)
; ^ produces a stream of lines from a file path
words*
; ^ applies the words filter
numbers*)))
; ^ applies the number filter and transform
; the stream from strings to numbers
(defn -main []
(println (run
; ^ executes a group of producers/consumers
(produce-numbers-from-file)
; ^ produce a stream of numbers from a file
(rs/produce-seq (range 1 10))
; ^ produce a stream of numbers from a seq
(rs/reduce + 0))))
; ^ sums up the numbers ignoring where they come from
Sometimes we want to create a new consumer composing simple consumers together, we can do that using the monadic API:
(ns river.examples.monadic
(:use [river.core])
(:require [river.seq :as rs]
[river.io :as rio]))
(def drop-and-head
(do-consumer
; ^ allows you to create a new consumer using
; a monadic notation as in clojure.algo.monads
[_ (rs/drop-while #(< % 5))
; ^ drops from the stream until the condition is met
b rs/first]
; ^ gets the first element after the dropping is done
b))
(defn -main []
(println (run
; ^ function to execute producers/consumers
(rs/produce-seq (range -20 20))
; ^ produce a stream of numbers from a seq
drop-and-head)))
; drops until < 5 and then gives the first element found
; (in this case 6)
Copyright (C) 2012 Roman Gonzalez
Distributed under the Eclipse Public License, the same as Clojure.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close