Liking cljdoc? Tell your friends :D

prpr.stream.operations


->sourceclj/s

(->source stream-or-coll)

turns a collection into a stream (with the collection as a chunk on the stream), otherwise does nothing to a stream

turns a collection into a stream
(with the collection as a chunk on the stream), otherwise does nothing
to a stream
sourceraw docstring

chunkifyclj/s

(chunkify target-chunk-size s)
(chunkify target-chunk-size partition-by-fn s)
(chunkify buffer-size target-chunk-size partition-by-fn s)

chunkify a stream - chunk a stream with a target chunk size, and optionally also partition-by, ensuring partitions never span chunk boundaries

  • buffer-size - output-stream buffer size
  • target-chunk-size - in the absence of partition-by-fn, output chunks will be this size or smaller
  • partition-by-fn - also partition-by the stream - ensuring that partitions never span chunk boundaries
chunkify a stream - chunk a stream with a target chunk size, and optionally
  also partition-by, ensuring partitions never span chunk boundaries

- buffer-size - output-stream buffer size
- target-chunk-size - in the absence of partition-by-fn, output chunks
    will be this size or smaller
- partition-by-fn - also partition-by the stream - ensuring that partitions
    never span chunk boundaries
sourceraw docstring

countclj/s

(count id s)

count the items on a stream

returns: Promise<count>

count the items on a stream

returns: Promise<count>
sourceraw docstring

filterclj/s

(filter pred s)
source

mapclj/s

(map f s)
(map f s & rest)

(map f Stream<val>) -> Stream<(f val)>

(map f Stream<val>) -> Stream<(f val)>
sourceraw docstring

mapcatclj/s

(mapcat f s)
(mapcat f s & rest)
source

mapconclj/s

(mapcon f n s)
(mapcon f n s & rest)

like map, but limits the number of concurrent unresolved promises from application of f

  • f is a promise-returning async fn. the result promise of f will be resolved and the resolved result placed on the output.
  • n is the maximum number of simultaneous unresolved promises

this works to control concurrency even when chunks are used - because using buffering to control concurrency no longer works when each buffered value can be a chunk or arbitrary size

note that using this fn may have performance implications - it dechunks and rechunks

like map, but limits the number of concurrent unresolved
promises from application of f

- f is a promise-returning async fn. the result promise
of f will be resolved and the resolved result placed on
the output.
- n is the maximum number of simultaneous unresolved
promises

this works to control concurrency even when chunks are
used - because using buffering to control concurrency
no longer works when each buffered value can be a chunk
or arbitrary size

note that using this fn may have performance
implications - it dechunks and rechunks
sourceraw docstring

put-all!clj/s

(put-all! sink vals)

puts all values onto a stream - first flattens any chunks from the vals, and creates a new chunk, then puts the chunk on the stream

puts all values onto a stream - first flattens any chunks from
the vals, and creates a new chunk, then puts the chunk on the
stream
sourceraw docstring

put-all-and-close!clj/s

(put-all-and-close! sink vals)
source

realize-eachclj/s

(realize-each s)

convert a Stream<Promise<val>|val> into Stream<val>

convert a Stream<Promise<val>|val> into Stream<val>
sourceraw docstring

reduceclj/s

(reduce id f s)
(reduce id f initial-val s)

reduce, but for streams. returns a Promise of the reduced value

an id is required for the reduction - this will be used to decorate any exception data, and helps to identify where an error happened, because exception stack-traces are generally not useful. a namespaced keyword identifying the function or block where a reduce is happening makes a good id e.g. ::reduce

NOTE the reducing function is not expected to be async - if it returns a promise then the promise will not be unwrapped, and unexpected things will probably happen

TODO add StreamError value handling

reduce, but for streams. returns a Promise of the reduced value

an id is required for the reduction - this will be used to
decorate any exception data, and helps to identify
where an error happened, because exception stack-traces are
generally not useful. a namespaced keyword identifying the function
or block where a reduce is happening makes a good id e.g. ::reduce

NOTE the reducing function is not expected to be async - if it
returns a promise then the promise will *not* be unwrapped, and
unexpected things will probably happen

TODO add StreamError value handling
sourceraw docstring

reduce-ex-infoclj/s

(reduce-ex-info id cause)

extend cause ex-data with a reduce-id, or wrap cause in an ex-info

extend cause ex-data with a reduce-id, or wrap cause in an ex-info
sourceraw docstring

reduce-loop*clj/s

(reduce-loop* _id f initial-val s)

the inner reduce loop

there were problems using a catch at the top level of the reduce - there were crashes on cljs, perhaps an uncaught promise left lying around somewhere, so instead errors are caught and returned from reduce-loop* as StreamError values

the inner reduce loop

there were problems using a catch at the top level of the reduce -
there were crashes on cljs, perhaps an uncaught promise left lying around
somewhere, so instead errors are caught and returned from reduce-loop*
as StreamError values
sourceraw docstring

reductionsclj/s

(reductions id f s)
(reductions id f initial-val s)

like clojure.core/reductions, but for streams

NOTE like manifold's own reductions, and unlike clojure.core/reductions, this returns an empty stream if the input stream is empty. this is because a connect-via implementation does not offer any ability to output anything if the input stream is empty

NOTE if the input contains chunks, the output will contain matching chunks

TODO add StreamError value handling

like clojure.core/reductions, but for streams

NOTE like manifold's own reductions, and unlike clojure.core/reductions,
this returns an empty stream if the input stream is empty. this is because
a connect-via implementation does not offer any ability to output
anything if the input stream is empty

NOTE if the input contains chunks, the output will contain matching chunks

TODO add StreamError value handling
sourceraw docstring

safe-chunk-xformclj/s

(safe-chunk-xform xform out)
  • xform : a transducer
  • out : the eventual output stream

returns a transducer which, in normal operation, unrolls chunks and composes with xform. if any exception is thrown it immediately errors the eventual output stream with the error, and then rethrows the error (since there is no sensible value to return)

- xform : a transducer
- out : the eventual output stream

returns a transducer which, in normal operation,  unrolls chunks and
composes with xform. if any exception is thrown it immediately
errors the eventual output stream with the error, and then rethrows
the error (since there is no sensible value to return)
sourceraw docstring

transformclj/s

(transform xform s)
(transform xform buffer-size s)

apply transform to a stream, returning a transformed stream

uses the underlying manifold or core.async feature to transform a stream, but wraps the xform in a safe-chunk-xform which

  • unrolls chunks for the xform
  • if the xform throws an exception then immediately errors the returned stream with the exception

NOTE connect-via error-handling doesn't work with xform errors, because the error doesn't happen in the connect-via fn, but rather in the manifold/core.async impl, and manifold's (at least) put! impl swallows the error, so connect-via sees no error. sidestepping this problem with the safe-chunk-xform and erroring the returned stream directly propagates the error, but also leads to some transformed values before the error going missing from the downstream, or arriving after the error, because of implicit buffering. an alternative might be to not use the core.async/manifold native transforms, but i've also never used exceptions for control flow, so i can't see this being a problem atm, so i'm sticking with the native transforms for now

NOTE2 - maybe errors can be flowed downstream by immediately wrapping them and sending them downstream, with the 2-arity, followed by calling the transducer finalizer 1-arity to close the downstream

apply transform to a stream, returning a transformed stream

uses the underlying manifold or core.async feature to transform
a stream, but wraps the xform in a safe-chunk-xform which

- unrolls chunks for the xform
- if the xform throws an exception then immediately errors the returned
  stream with the exception

NOTE connect-via error-handling doesn't work with xform errors, because
the error doesn't happen in the connect-via fn, but rather in the
manifold/core.async impl, and manifold's (at least) put! impl swallows the
error, so connect-via sees no error. sidestepping this problem with
the safe-chunk-xform and erroring the returned stream directly propagates the
error, but also leads to some transformed values before the error going
missing from the downstream, or arriving after the error, because of implicit
buffering. an alternative might be to not use the core.async/manifold native
transforms, but i've also never used exceptions for control flow, so i can't
see this being a problem atm, so i'm sticking with the native transforms for
now

NOTE2 - maybe errors can be flowed downstream by immediately wrapping them
and sending them downstream, with the 2-arity, followed by calling the
transducer finalizer 1-arity to close the downstream
sourceraw docstring

zipclj/s

(zip a)
(zip a & rest)

zip streams S<a> S<b> ... -> S<[a b ...]>

the output stream will terminate with the first input stream which terminates

zip streams
  S<a> S<b> ... -> S<[a b ...]>

the output stream will terminate with the first input stream
which terminates
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close