Liking cljdoc? Tell your friends :D

Flow

Have you ever needed a way for an OutputStream to be sent to some other kind of stream? Flow is a small library that allows you to do just that, without having to fiddle with PipedInputStreams and related. Because yes, it's possible to pipe an output stream into an input stream, and then use byte-streams to do further conversions. But it's all very cumbersome and error-prone. And you need threads, which seems overkill for something as seemingly simple as this.

Also, most libs only support InputStream.

What this Flow does is it provides a custom implementation of java.io.OutputStream that allows you to forward any bytes that are written to it to a Manifold stream. From there on you can do more transformations as you like (e.g. send it to a core.async channel).

Usage

Include the lib in your project. For deps.edn:

com.monkeyprojects/flow {:mvn/version "<VERSION>"}

Or Leiningen:

[com.monkeyprojects/flow "<VERSION>"]

The functions are in monkey.flow. First of all, there is the core raw-stream function. It creates an implementation of a java.io.OutputStream that puts any data it receives onto a Manifold stream. Since Java's OutputStreams accept data in two ways: either as single bytes, or as byte arrays, I have decided to keep this distinction, mostly for performance reasons. For every call to a write function, a message is put onto the configured stream. For example:

(require '[monkey.flow :as f])
(require '[manifold.stream :as ms])

(def s (ms/stream 1))
;; Create a stream that delegates to the above Manifold stream
(def os (f/raw-stream s))

;; Write to the stream
(.write os (.getBytes "this is a test"))
;; This is what we receive on the stream
@(ms/take! s)
;; => {:buf [116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 116, 101, 115, 116],
;;     :off 0,
;;     :len 14}

In this case I have configured a buffer of 1 item in the stream, otherwise the write operation would block. But if you're using async take! this won't be necessary.

The stream outputs a map that either contains a single :byte, or, as in the above example, a byte array buffer, with and offset and a length, which are just passed on from the write calls.

Transducers

Now the above is all fine and dandy, but not very useful. I have deliberately left the stream output as basic as possible, to allow for maximal customization. But I have provided several transducers that you can also pass to the stream constructors. They will process the incoming raw data and do something useful with it.

Text Lines

A first possible use is to group the information in text lines. Yes, I know, this can also be done using a BufferedReader, but more on that below. You can use it like this:

(def s (ms/stream 1 (f/line-group-xf)))
(def os (f/raw-stream s))

;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)

;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "this is ")
(.print pw "a test\nand this is ")
(.println pw "line 2")
(.close pw)

The above will output this:

Received: this is a test

Received: and this is line 2

Note that it retains the newlines, so you can do what you want with them. When the output stream gets closed, any remaining data is also flushed. The line-group-xf function takes an optional charset argument (either a string or a Java Charset).

Buffers

Instead of emitting strings on every newline, you may want to do that on fixed sizes. For this, the buffer-xf transducer is provided. I takes a buffer size (by default 64K) and then only sends out strings whenever the byte buffer is full. Since there is also a string conversion involved, this transducer also accepts a charset.

(def s (ms/stream 1 (f/buffer-xf 10)))
(def os (f/raw-stream s))

;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)

;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "aaaaabbbbbcccccddddd")
(.close pw)

The above will output this:

Received: aaaaabbbbb
Received: cccccddddd

Flushing

In addition to flushing the buffer whenever it's full, the buffered transducer also supports explicit flushing. This may be useful if you don't want to wait for a large buffer to be full, or you want a periodical flush. To do this, send a ::monkey.flow/flush keyword to the stream. For example:

(def s (ms/stream 1 (f/buffer-xf 10)))
(def os (f/raw-stream s))

;; Set up periodical flushing
(ms/connect (ms/periodically 100 (constantly ::f/flush)) s)

;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)

;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "test")
(.close pw)

Even though after writing test to the output stream, the internal buffer won't be full, it will still print it out to the console because every 100ms a flush token is sent to the stream.

You can of course also provide your own transducers or compose streams as documented.

Why not use piped streams?

Java does of course already provide a way to link an OutputStream to an InputStream, and from there on do any kind of conversion you'd want using a variety of Readers. But these all involve some kind of threading solution, since you need to setup a loop to periodically read the data from the InputStream. This may or may not block when there is no data available. And from there you could still push it on to a Manifold stream. That will also work. But I consider it a lot more complicated and brittle. It also requires more resources (threads are expensive, after all), although since Java 21 you may be able to resort to virtual threads.

I have provided this lib in cases where the above is not a feasible solution, for example when many concurrent streams are involved. I leave the choice up to you.

License

MIT license

Copyright (c) 2026 by Monkey Projects

Can you improve this documentation?Edit on Codeberg

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close