(->source stream-or-coll)
turns a collection into a stream (with the collection as a chunk on the stream), otherwise does nothing to a stream
turns a collection into a stream (with the collection as a chunk on the stream), otherwise does nothing to a stream
(chunkify target-chunk-size s)
(chunkify target-chunk-size partition-by-fn s)
(chunkify buffer-size target-chunk-size partition-by-fn s)
chunkify a stream - chunk a stream with a target chunk size, and optionally also partition-by, ensuring partitions never span chunk boundaries
chunkify a stream - chunk a stream with a target chunk size, and optionally also partition-by, ensuring partitions never span chunk boundaries - buffer-size - output-stream buffer size - target-chunk-size - in the absence of partition-by-fn, output chunks will be this size or smaller - partition-by-fn - also partition-by the stream - ensuring that partitions never span chunk boundaries
(count id s)
count the items on a stream
returns: Promise<count>
count the items on a stream returns: Promise<count>
(map f s)
(map f s & rest)
(map f Stream<val>) -> Stream<(f val)>
(map f Stream<val>) -> Stream<(f val)>
(mapcon f n s)
(mapcon f n s & rest)
like map, but limits the number of concurrent unresolved promises from application of f
this works to control concurrency even when chunks are used - because using buffering to control concurrency no longer works when each buffered value can be a chunk or arbitrary size
note that using this fn may have performance implications - it dechunks and rechunks
like map, but limits the number of concurrent unresolved promises from application of f - f is a promise-returning async fn. the result promise of f will be resolved and the resolved result placed on the output. - n is the maximum number of simultaneous unresolved promises this works to control concurrency even when chunks are used - because using buffering to control concurrency no longer works when each buffered value can be a chunk or arbitrary size note that using this fn may have performance implications - it dechunks and rechunks
(put-all! sink vals)
puts all values onto a stream - first flattens any chunks from the vals, and creates a new chunk, then puts the chunk on the stream
puts all values onto a stream - first flattens any chunks from the vals, and creates a new chunk, then puts the chunk on the stream
(realize-each s)
convert a Stream<Promise<val>|val> into Stream<val>
convert a Stream<Promise<val>|val> into Stream<val>
(reduce id f s)
(reduce id f initial-val s)
reduce, but for streams. returns a Promise of the reduced value
an id is required for the reduction - this will be used to decorate any exception data, and helps to identify where an error happened, because exception stack-traces are generally not useful. a namespaced keyword identifying the function or block where a reduce is happening makes a good id e.g. ::reduce
NOTE the reducing function is not expected to be async - if it returns a promise then the promise will not be unwrapped, and unexpected things will probably happen
TODO add StreamError value handling
reduce, but for streams. returns a Promise of the reduced value an id is required for the reduction - this will be used to decorate any exception data, and helps to identify where an error happened, because exception stack-traces are generally not useful. a namespaced keyword identifying the function or block where a reduce is happening makes a good id e.g. ::reduce NOTE the reducing function is not expected to be async - if it returns a promise then the promise will *not* be unwrapped, and unexpected things will probably happen TODO add StreamError value handling
(reduce-ex-info id cause)
extend cause ex-data with a reduce-id, or wrap cause in an ex-info
extend cause ex-data with a reduce-id, or wrap cause in an ex-info
(reduce-loop* _id f initial-val s)
the inner reduce loop
there were problems using a catch at the top level of the reduce - there were crashes on cljs, perhaps an uncaught promise left lying around somewhere, so instead errors are caught and returned from reduce-loop* as StreamError values
the inner reduce loop there were problems using a catch at the top level of the reduce - there were crashes on cljs, perhaps an uncaught promise left lying around somewhere, so instead errors are caught and returned from reduce-loop* as StreamError values
(reductions id f s)
(reductions id f initial-val s)
like clojure.core/reductions, but for streams
NOTE like manifold's own reductions, and unlike clojure.core/reductions, this returns an empty stream if the input stream is empty. this is because a connect-via implementation does not offer any ability to output anything if the input stream is empty
NOTE if the input contains chunks, the output will contain matching chunks
TODO add StreamError value handling
like clojure.core/reductions, but for streams NOTE like manifold's own reductions, and unlike clojure.core/reductions, this returns an empty stream if the input stream is empty. this is because a connect-via implementation does not offer any ability to output anything if the input stream is empty NOTE if the input contains chunks, the output will contain matching chunks TODO add StreamError value handling
(safe-chunk-xform xform out)
returns a transducer which, in normal operation, unrolls chunks and composes with xform. if any exception is thrown it immediately errors the eventual output stream with the error, and then rethrows the error (since there is no sensible value to return)
- xform : a transducer - out : the eventual output stream returns a transducer which, in normal operation, unrolls chunks and composes with xform. if any exception is thrown it immediately errors the eventual output stream with the error, and then rethrows the error (since there is no sensible value to return)
(transform xform s)
(transform xform buffer-size s)
apply transform to a stream, returning a transformed stream
uses the underlying manifold or core.async feature to transform a stream, but wraps the xform in a safe-chunk-xform which
NOTE connect-via error-handling doesn't work with xform errors, because the error doesn't happen in the connect-via fn, but rather in the manifold/core.async impl, and manifold's (at least) put! impl swallows the error, so connect-via sees no error. sidestepping this problem with the safe-chunk-xform and erroring the returned stream directly propagates the error, but also leads to some transformed values before the error going missing from the downstream, or arriving after the error, because of implicit buffering. an alternative might be to not use the core.async/manifold native transforms, but i've also never used exceptions for control flow, so i can't see this being a problem atm, so i'm sticking with the native transforms for now
NOTE2 - maybe errors can be flowed downstream by immediately wrapping them and sending them downstream, with the 2-arity, followed by calling the transducer finalizer 1-arity to close the downstream
apply transform to a stream, returning a transformed stream uses the underlying manifold or core.async feature to transform a stream, but wraps the xform in a safe-chunk-xform which - unrolls chunks for the xform - if the xform throws an exception then immediately errors the returned stream with the exception NOTE connect-via error-handling doesn't work with xform errors, because the error doesn't happen in the connect-via fn, but rather in the manifold/core.async impl, and manifold's (at least) put! impl swallows the error, so connect-via sees no error. sidestepping this problem with the safe-chunk-xform and erroring the returned stream directly propagates the error, but also leads to some transformed values before the error going missing from the downstream, or arriving after the error, because of implicit buffering. an alternative might be to not use the core.async/manifold native transforms, but i've also never used exceptions for control flow, so i can't see this being a problem atm, so i'm sticking with the native transforms for now NOTE2 - maybe errors can be flowed downstream by immediately wrapping them and sending them downstream, with the 2-arity, followed by calling the transducer finalizer 1-arity to close the downstream
(zip a)
(zip a & rest)
zip streams S<a> S<b> ... -> S<[a b ...]>
the output stream will terminate with the first input stream which terminates
zip streams S<a> S<b> ... -> S<[a b ...]> the output stream will terminate with the first input stream which terminates
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close