low-level stream transport operations - covering creating streams, put!ing onto and take!ing from a stream, error propagation, special value wrapping/unwrapping and stream connections
low-level stream transport operations - covering creating streams, put!ing onto and take!ing from a stream, error propagation, special value wrapping/unwrapping and stream connections
(closed? s)
test if a stream is closed. returns true|false
not in the public API, because it's often a race condition, but may be sometimes useful for inspection (e.g. in tests)
test if a stream is closed. returns `true|false` not in the public API, because it's often a race condition, but may be sometimes useful for inspection (e.g. in tests)
(connect-via source f sink)
(connect-via source f sink opts)
feed all messages from stream source
into the callback f
on the
understanding that they will eventually propagate into
stream sink
the return value of f
should be a boolean|Promise<boolean>
.
when false the downstream sink
is assumed to be closed and the connection is severed
key | description |
---|---|
source | source stream |
sink | destination stream |
f | 1-arity callback returning boolean|Promise<boolean> |
opts | map of connection options - will be passed to Manifold on clj, but non-default options may misbehave on cljs |
:prpr3.stream/downstream? | whether the source closing will close the sink, defaults to true |
:prpr3.stream/upstream? | whether the sink closing will close the source, even if there are other sinks downstream of the source, defaults to true |
:prpr3.stream/timeout | timeout in ms before the connection is severed. defaults to nil |
:prpr3.stream/description | description of the connection. useful for introspection |
feed all messages from stream `source` into the callback `f` on the understanding that they will eventually propagate into stream `sink` the return value of `f` should be a `boolean|Promise<boolean>`. when false the downstream `sink` is assumed to be closed and the connection is severed | key | description | | ------------------------------|-------------| | `source` | source stream | `sink` | destination stream | `f` | 1-arity callback returning `boolean|Promise<boolean>` | `opts` | map of connection options - will be passed to Manifold on clj, but non-default options may misbehave on cljs | `:prpr3.stream/downstream?` | whether the source closing will close the sink, defaults to `true` | `:prpr3.stream/upstream?` | whether the sink closing will close the source, even if there are other sinks downstream of the source, defaults to `true` | `:prpr3.stream/timeout` | timeout in ms before the connection is severed. defaults to `nil` | `:prpr3.stream/description` | description of the connection. useful for introspection
(error! sink err)
mark a stream as errored
puts an marker wrapper with the error on to the stream, and then closes it. consuming fns will throw an error when they encounter it, so errors are always propagated
it would be nicer if the underlying stream/channel had an error state, but this is the best to be done without wrapping the underlying stream/channel
sink
- a streamerr
- the error valuemark a stream as errored puts an marker wrapper with the error on to the stream, and then closes it. consuming fns will throw an error when they encounter it, so errors are always propagated it would be nicer if the underlying stream/channel had an error state, but this is the best to be done without wrapping the underlying stream/channel - `sink` - a stream - `err` - the error value
(put! sink val)
(put! sink val timeout timeout-val)
put a value onto stream with backpressure
sink
- a streamval
- the valuetimeout
- optional timeout in mstimeout
- optional value to return in case of timeout. default nil
returns Promise<true|false>
which eventually resolves to:
true
when the value was accepted onto the streamfalse
if the stream was closedtimeout-val
if the put!
timed output a value onto stream with backpressure - `sink` - a stream - `val` - the value - `timeout` - optional timeout in ms - `timeout` - optional value to return in case of timeout. default `nil` returns `Promise<true|false>` which eventually resolves to: - `true` when the value was accepted onto the stream - `false` if the stream was closed - `timeout-val` if the `put!` timed out
(put-all! sink vals)
put all values individually onto a stream with backpressure
returns Promise<true|false>
yielding true
if all
values were accepted onto the stream, false
otherwise
sink
- a streamvals
- a sequence of valuesput all values individually onto a stream with backpressure returns `Promise<true|false>` yielding `true` if all values were accepted onto the stream, `false` otherwise - `sink` - a stream - `vals` - a sequence of values
(put-all-and-close! sink vals)
a convenience fn to put-all!
values onto a stream and
then close the stream. returns Promise<true|false>
sink
- a streamvals
- a sequence of valuesa convenience fn to `put-all!` values onto a stream and then close the stream. returns `Promise<true|false>` - `sink` - a stream - `vals` - a sequence of values
(safe-connect-via-fn f sink)
return a new connect-via fn which handles errors in the connect fn or from the source and error!s the sink
return a new connect-via fn which handles errors in the connect fn or from the source and error!s the sink
(stream)
(stream buffer)
(stream buffer xform)
(stream buffer xform executor)
(stream)
(stream buffer)
(stream buffer xform)
create a stream
buffer
: optional buffer size. default 0
xform
: optional transducer applied to values put on the streamcreate a stream - `buffer` : optional buffer size. default `0` - `xform` : optional transducer applied to values put on the stream
(stream? v)
test if v
is a stream. returns true|false
test if `v` is a stream. returns `true|false`
(take! source)
(take! source default-val)
(take! source default-val timeout timeout-val)
take a value from a stream.
source
- a streamdefault-val
- optional value to return if the stream closestimeout
- optional timeout in mstimeout-val
- optional value to return if the stream times outreturns Promise<value|error>
which evantually resolves to:
nil
or default-val
if the stream closestimeout-val
if no value becomes available in timeout
msNOTE take!
would ideally not return chunks, but it curently does...
there is not currently a good way of using a consumer/ChunkConsumer
in the public API, since i don't want to wrap the underlying stream
or channel in something else
take a value from a stream. - `source` - a stream - `default-val` - optional value to return if the stream closes - `timeout` - optional timeout in ms - `timeout-val` - optional value to return if the stream times out returns `Promise<value|error>` which evantually resolves to: - a value when one becomes available - `nil` or `default-val` if the stream closes - `timeout-val` if no value becomes available in `timeout` ms - an error if the stream errored (i.e. an error occurred during some upstream operation) NOTE `take!` would ideally not return chunks, but it curently does... there is not currently a good way of using a consumer/ChunkConsumer in the public API, since i don't want to wrap the underlying stream or channel in something else
(unwrap-platform-error x)
unwrap a platform error to get at the cause
unwrap a platform error to get at the cause
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close