Have you ever needed a way for an OutputStream to be sent to some other kind of stream?
Flow is a small library that allows you to do just that, without having to fiddle
with PipedInputStreams and related. Because yes, it's possible to pipe an
output stream into an input stream, and then use byte-streams
to do further conversions. But it's all very cumbersome and error-prone. And you
need threads, which seems overkill for something as seemingly simple as this.
Also, most libs only support InputStream.
What this Flow does is it provides a custom implementation of java.io.OutputStream
that allows you to forward any bytes that are written to it to a
Manifold stream. From there on you can
do more transformations as you like (e.g. send it to a core.async channel).
Include the lib in your project.
For deps.edn:
com.monkeyprojects/flow {:mvn/version "<VERSION>"}
Or Leiningen:
[com.monkeyprojects/flow "<VERSION>"]
The functions are in monkey.flow. First of all, there is the core raw-stream function.
It creates an implementation of a java.io.OutputStream that puts any data it receives onto
a Manifold stream. Since
Java's OutputStreams accept data in two ways: either as single bytes, or as byte arrays,
I have decided to keep this distinction, mostly for performance reasons. For every call to
a write function, a message is put onto the configured stream. For example:
(require '[monkey.flow :as f])
(require '[manifold.stream :as ms])
(def s (ms/stream 1))
;; Create a stream that delegates to the above Manifold stream
(def os (f/raw-stream s))
;; Write to the stream
(.write os (.getBytes "this is a test"))
;; This is what we receive on the stream
@(ms/take! s)
;; => {:buf [116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 116, 101, 115, 116],
;; :off 0,
;; :len 14}
In this case I have configured a buffer of 1 item in the stream, otherwise the write operation
would block. But if you're using async take! this won't be necessary.
The stream outputs a map that either contains a single :byte, or, as in the above example,
a byte array buffer, with and offset and a length, which are just passed on from the write
calls.
Now the above is all fine and dandy, but not very useful. I have deliberately left the stream
output as basic as possible, to allow for maximal customization. But I have provided several
transducers that you can also pass to the stream
constructors. They will process the incoming raw data and do something useful with it.
A first possible use is to group the information in text lines. Yes, I know, this can also
be done using a BufferedReader, but more on that below. You can use it like this:
(def s (ms/stream 1 (f/line-group-xf)))
(def os (f/raw-stream s))
;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)
;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "this is ")
(.print pw "a test\nand this is ")
(.println pw "line 2")
(.close pw)
The above will output this:
Received: this is a test
Received: and this is line 2
Note that it retains the newlines, so you can do what you want with them. When the output stream
gets closed, any remaining data is also flushed. The line-group-xf function takes an optional
charset argument (either a string or a Java Charset).
Instead of emitting strings on every newline, you may want to do that on fixed sizes. For this,
the buffer-xf transducer is provided. I takes a buffer size (by default 64K) and then only
sends out strings whenever the byte buffer is full. Since there is also a string conversion
involved, this transducer also accepts a charset.
(def s (ms/stream 1 (f/buffer-xf 10)))
(def os (f/raw-stream s))
;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)
;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "aaaaabbbbbcccccddddd")
(.close pw)
The above will output this:
Received: aaaaabbbbb
Received: cccccddddd
In addition to flushing the buffer whenever it's full, the buffered transducer also supports
explicit flushing. This may be useful if you don't want to wait for a large buffer to be full,
or you want a periodical flush. To do this, send a ::monkey.flow/flush keyword to the
stream. For example:
(def s (ms/stream 1 (f/buffer-xf 10)))
(def os (f/raw-stream s))
;; Set up periodical flushing
(ms/connect (ms/periodically 100 (constantly ::f/flush)) s)
;; Print out line by line
(ms/consume (fn [l] (println "Received:" l)) s)
;; Autoflushing print writer
(def pw (java.io.PrintWriter. os true))
;; Write to the stream
(.print pw "test")
(.close pw)
Even though after writing test to the output stream, the internal buffer won't be full,
it will still print it out to the console because every 100ms a flush token is sent to
the stream.
You can of course also provide your own transducers or compose streams as documented.
Java does of course already provide a way to link an OutputStream to an InputStream,
and from there on do any kind of conversion you'd want using a variety of Readers. But
these all involve some kind of threading solution, since you need to setup a loop to
periodically read the data from the InputStream. This may or may not block when there
is no data available. And from there you could still push it on to a Manifold stream. That
will also work. But I consider it a lot more complicated and brittle. It also requires
more resources (threads are expensive, after all), although since Java 21 you may be able
to resort to virtual threads.
I have provided this lib in cases where the above is not a feasible solution, for example when many concurrent streams are involved. I leave the choice up to you.
Copyright (c) 2026 by Monkey Projects
Can you improve this documentation?Edit on Codeberg
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |