The Readable stream interface is the abstraction for a source of data that you are reading from. In other words, data comes out of a Readable stream.
A Readable stream will not start emitting data until you indicate that you are ready to receive it.
Readable streams have two "modes": a flowing mode and a paused
mode. When in flowing mode, data is read from the underlying system
and provided to your program as fast as possible. In paused mode, you
must explicitly call stream.read()
to get chunks of data out.
Streams start out in paused mode.
Note: If no data event handlers are attached, and there are no stream.pipe()
destinations, and the stream is switched into flowing
mode, then data will be lost.
You can switch to flowing mode by doing any of the following:
'data'
event handler to listen for data.stream.resume()
method to explicitly open the
flow.stream.pipe()
method to send the data to a Writable.You can switch back to paused mode by doing either of the following:
stream.pause()
method.'data'
event
handlers, and removing all pipe destinations by calling the stream.unpipe()
method.Note that, for backwards compatibility reasons, removing ['data'
][]
event handlers will not automatically pause the stream. Also, if
there are piped destinations, then calling [stream.pause()
][stream-pause] will
not guarantee that the stream will remain paused once those
destinations drain and ask for more data.
Examples of readable streams include:
process.stdin
Note: streams are instances of node EventEmitters. See https://nodejs.org/api/events.html#events_class_eventemitter
When a chunk of data can be read from the stream, it will emit a 'readable' event.
In some cases, listening for a 'readable' event will cause some data to be read into the internal buffer from the underlying system, if it hadn't already.
Once the internal buffer is drained, a 'readable' event will fire again when more data is available.
(.on r "readable"
(fn []
(println "there is some data to read now")))
The 'readable' event is not emitted in the "flowing" mode with the sole exception of the last one, on end-of-stream.
The 'readable' event indicates that the stream has new information: either new data is available or the end of the stream has been reached. In the former case, stream.read() will return that data. In the latter case, stream.read() will return null. For instance, in the following example, foo.txt is an empty file:
(def r (. fs createReadStream "foo.txt"))
(-> r
(.on "readable"
(fn [] (println "readable: " (.read r))))
(.on "end"
(fn [] (println "end"))))
The output of running this script is:
$ node test.js
readable: null
end
(.on r "data"
(fn [chunk]
(println "got " (.-length chunk) " bytes of data" )))
(-> r
(.on "data"
(fn [chunk] (println "got " (.-length chunk) " bytes of data" )))
(.on "end"
(fn [] (println "there will be no more data" ))))
'close'
event.This method returns whether or not the readable has been explicitly paused by client code (using stream.pause() without a corresponding stream.resume())
(def readable (new stream.Readable))
(.isPaused readable) // === false
(.pause readable)
(.isPaused readable) // === true
(.resume readable)
(.isPaused readable) // === false
This method will cause a stream in flowing mode to stop emitting 'data' events, switching out of flowing mode. Any data that becomes available will remain in the internal buffer.
(.on readable 'data',
(fn [chunk]
(println "got" (.-length chunk) "bytes of data")
(.pause readable)
(println 'there will be no more data for 1 second')
(js/setTimeout
(fn []
(println "now data will start flowing again")
(.resume readable))
1000)))
{Object}
:end true
(let [r (. fs createReadStream "foo.edn")
z (. zlib createGzip)
w (. fs createWriteStream "foo.edn.gz")]
(-> r
(.pipe z)
(.pipe w)))
:end false
an opt to keep the destination stream open.
(.pipe reader writer #js {:end false})
(.on reader "end" (fn [] (.end writer "Goodbye!\n")))
read()
method pulls some data out of the internal buffer and
returns it.null
.size
argument, then it will return all the
data in the internal buffer.size
argument, then it will return that many
bytes. If size
bytes are not available, then it will return null
,
unless we've ended, in which case it will return the data remaining
in the buffer.'data'
event.stream.read([size])
after the 'end'
event has been triggered will return null
. No runtime error will be raised.(.on r "readable"
(fn []
(let [chunks (take-while identity (repeatedly #(.read r 1)))]
(doseq [chunk chunks]
(println "got " (.-length chunk) " bytes of data" )))))
This method will cause the readable stream to resume emitting ['data'
][] events.
This method will switch the stream into flowing mode. If you do not want to consume the data from a stream, but you do want to get to its 'end'
event, you can call stream.resume()
to open the flow of data.
...
(.on r "end"
(fn [] (println "got to the end, but did not read anything"))
...
(.resume r)
readable.setEncoding('utf8')
, then the output data will be interpreted as UTF-8 data, and returned as strings. If you do readable.setEncoding('hex')
, then the data will be encoded in hexadecimal string format.buf.toString(encoding)
on them. If you want to read the data as strings, always use this method.
(.setEncoding r "utf8")
(.on r "data"
(fn [chunk]
(assert (string? (type chunk)))
(println "got " (.-length chunk) " characters of string data")))
stream.pipe()
call.(def readable (getReadableStreamSomehow))
(def writable (. fs createWriteStream "file.txt")
// All the data from readable goes into 'file.txt',
// but only for the first second
(.pipe readable writable)
(js/setTimeout
(fn []
(println 'stop writing to file.txt')
(.unpipe readable writable)
(println 'manually close the file stream')
(.end writable))
1000)
stream.unshift(chunk)
cannot be called after the 'end'
event has been triggered; a runtime error will be raised. (def StringDecoder (. (require "string_decoder") -StringDecoder))
(defn parseHeader
[rstream cb]
(let [decoder (StringDecoder. "utf8")
header (atom "")
onReadable (fn onReadable []
(while-let [chunk (.read rstream)]
(let [s (.write decoder chunk)]
(if (.match s #"\n\n")
(let [splt (.split s #"\n\n")
_ (swap! header str (.shift splt))
remaining (.join splt "\n\n")
buf (Buffer. remaining "utf8")]
(if (.-length buf)
(.unshift rstream buf)) ;<--
(-> rstream
(.removeListener "error" cb)
(.removeListener "readable" onReadable))
(cb nil @header rstream))
(swap! header str s)))))]
(-> rstream
(.on "error" cb)
(.on "readable" onReadable))))
stream.push(chunk)
, stream.unshift(chunk)
will not end the reading process by resetting the internal reading state of the
stream. This can cause unexpected results if unshift()
is called during a
read (i.e. from within a stream._read()
implementation on a
custom stream). Following the call to unshift()
with an immediate
stream.push('')
will reset the reading state appropriately,
however it is best to simply avoid calling unshift()
while in the process of
performing a read.(ReadableStream options)
{IMap}
{Int}
16384
(16kb), or 16
for objectMode
streamsnil
{boolean}
stream.read(n)
returns a single value instead of a Buffer of size n.false
{function(number)}
stream._read()
method (see below)readable._read(size)
size : {number}
size
bytes are available before calling stream.push(chunk)
The purpose of this function is to obtain data from some underlying resource and place it into the stream's internal buffer via .push(chunk)
. The details of that resource are up to you, but you should make decisions based on the return value of .push(chunk)
in addition to the status of your resource
ReadableInstance.read()
above_read()
should continue reading + pushing data while .push(chunk) -> true
_read()
is called, it is not called again until the .push()
method is called.push(chunk)
-> false
, stop reading from the resource._read()
is called again after it has stopped should it start reading more data from the resource and pushing that data onto the queue..push(nil)
to end the stream.readable.push(chunk, ?encoding)
-> Boolean{string}
"utf8"
or "ascii"
{boolean}
push()
method adds a chunk of data
into the queue for subsequent stream processors to consume. If null
is
passed, it signals the end of the stream (EOF), after which no more data
can be written.push()
can be pulled out by calling the stream.read()
method when the "readable"
event fires.This is a basic example of a Readable stream. It emits the numerals from 1 to 10 in ascending order, and then ends.
(defn counter []
(let [_max 10
index (atom 0)
_read (fn []
(this-as this
(let [i (swap! index inc)]
(if (> i _max)
(. this push nil)
(let [s (str "\n" i)
buf (js/Buffer s "ascii")]
(. this push buf))))))]
(ReadableStream {:read _read})))
(.pipe (counter) (.-stdout js/process))
This stream accesses a clojure vector via a clojure and drops its head with every read call
(let [counter (atom [1 2 3 4 5 6 7 8 9 10])
opts {:objectMode true
:read (fn [_]
(this-as this
(if (and @counter (.push this (vec @counter)))
(swap! counter next)
(if-not @counter
(.push this nil)))))}
r (ReadableStream opts)]
(-> r
(.once "readable" (fn [] (println (take-while identity (repeatedly #(.read r))))))
(.on "end" #(js/console.log "\nWe've reached the end!!"))))
;=> ([1 2 3 4 5 6 7 8 9 10] ... [8 9 10] [9 10] [10])
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close