Liking cljdoc? Tell your friends :D

prpr3.stream.transport

low-level stream transport operations - covering creating streams, put!ing onto and take!ing from a stream, error propagation, special value wrapping/unwrapping and stream connections

low-level stream transport operations -
covering creating streams, put!ing onto and
take!ing from a stream, error propagation,
special value wrapping/unwrapping and
stream connections
raw docstring

close!clj/s

(close! s)

close a stream

close a stream
sourceraw docstring

closed?clj/s

(closed? s)

test if a stream is closed. returns true|false

not in the public API, because it's often a race condition, but may be sometimes useful for inspection (e.g. in tests)

test if a stream is closed. returns `true|false`

not in the public API, because it's often a race condition,
but may be sometimes useful for inspection (e.g. in tests)
sourceraw docstring

connect-viaclj/s

(connect-via source f sink)
(connect-via source f sink opts)

feed all messages from stream source into the callback f on the understanding that they will eventually propagate into stream sink

the return value of f should be a boolean|Promise<boolean>. when false the downstream sink is assumed to be closed and the connection is severed

keydescription
sourcesource stream
sinkdestination stream
f1-arity callback returning boolean|Promise<boolean>
optsmap of connection options - will be passed to Manifold on clj, but non-default options may misbehave on cljs
:prpr3.stream/downstream?whether the source closing will close the sink, defaults to true
:prpr3.stream/upstream?whether the sink closing will close the source, even if there are other sinks downstream of the source, defaults to true
:prpr3.stream/timeouttimeout in ms before the connection is severed. defaults to nil
:prpr3.stream/descriptiondescription of the connection. useful for introspection
feed all messages from stream `source` into the callback `f` on the
understanding that they will eventually propagate into
stream `sink`

the return value of `f` should be a `boolean|Promise<boolean>`.
when false the downstream `sink`
is assumed to be closed and the connection is severed

| key                           | description |
| ------------------------------|-------------|
| `source`                      | source stream
| `sink`                        | destination stream
| `f`                           | 1-arity callback returning `boolean|Promise<boolean>`
| `opts`                        | map of connection options - will be passed to Manifold on clj, but non-default options may misbehave on cljs
|   `:prpr3.stream/downstream?` | whether the source closing will close the sink, defaults to `true`
|   `:prpr3.stream/upstream?`   | whether the sink closing will close the source, even if there are other sinks downstream of the source, defaults to `true`
|   `:prpr3.stream/timeout`     | timeout in ms before the connection is severed. defaults to `nil`
|   `:prpr3.stream/description` | description of the connection. useful for introspection
sourceraw docstring

error!clj/s

(error! sink err)

mark a stream as errored

puts an marker wrapper with the error on to the stream, and then closes it. consuming fns will throw an error when they encounter it, so errors are always propagated

it would be nicer if the underlying stream/channel had an error state, but this is the best to be done without wrapping the underlying stream/channel

  • sink - a stream
  • err - the error value
mark a stream as errored

puts an marker wrapper with the error on to the stream,
and then closes it. consuming fns will throw an error
when they encounter it, so errors are always propagated

it would be nicer if the underlying stream/channel had
an error state, but this is the best to be done without
wrapping the underlying stream/channel

- `sink` - a stream
- `err` - the error value
sourceraw docstring

put!clj/s

(put! sink val)
(put! sink val timeout timeout-val)

put a value onto stream with backpressure

  • sink - a stream
  • val - the value
  • timeout - optional timeout in ms
  • timeout - optional value to return in case of timeout. default nil

returns Promise<true|false> which eventually resolves to:

  • true when the value was accepted onto the stream
  • false if the stream was closed
  • timeout-val if the put! timed out
put a value onto stream with backpressure

- `sink` - a stream
- `val` - the value
- `timeout` - optional timeout in ms
- `timeout` - optional value to return in case of timeout. default `nil`

returns `Promise<true|false>` which eventually resolves to:
 - `true` when the value was accepted onto the stream
 - `false` if the stream was closed
 - `timeout-val` if the `put!` timed out
sourceraw docstring

put-all!clj/s

(put-all! sink vals)

put all values individually onto a stream with backpressure returns Promise<true|false> yielding true if all values were accepted onto the stream, false otherwise

  • sink - a stream
  • vals - a sequence of values
put all values individually onto a stream with backpressure
returns `Promise<true|false>` yielding `true` if all
values were accepted onto the stream, `false` otherwise

- `sink` - a stream
- `vals` - a sequence of values
sourceraw docstring

put-all-and-close!clj/s

(put-all-and-close! sink vals)

a convenience fn to put-all! values onto a stream and then close the stream. returns Promise<true|false>

  • sink - a stream
  • vals - a sequence of values
a convenience fn to `put-all!` values onto a stream and
then close the stream. returns `Promise<true|false>`

- `sink` - a stream
- `vals` - a sequence of values

sourceraw docstring

safe-connect-via-fnclj/s

(safe-connect-via-fn f sink)

return a new connect-via fn which handles errors in the connect fn or from the source and error!s the sink

return a new connect-via fn which handles errors
in the connect fn or from the source and error!s
the sink
sourceraw docstring

streamclj/s≠

clj
(stream)
(stream buffer)
(stream buffer xform)
(stream buffer xform executor)
cljs
(stream)
(stream buffer)
(stream buffer xform)

create a stream

  • buffer : optional buffer size. default 0
  • xform : optional transducer applied to values put on the stream
create a stream

- `buffer` : optional buffer size. default `0`
- `xform` : optional transducer applied to values put on the stream
sourceraw docstring

stream-factoryclj/s

source

stream?clj/s

(stream? v)

test if v is a stream. returns true|false

test if `v` is a stream. returns `true|false`
sourceraw docstring

take!clj/s

(take! source)
(take! source default-val)
(take! source default-val timeout timeout-val)

take a value from a stream.

  • source - a stream
  • default-val - optional value to return if the stream closes
  • timeout - optional timeout in ms
  • timeout-val - optional value to return if the stream times out

returns Promise<value|error> which evantually resolves to:

  • a value when one becomes available
  • nil or default-val if the stream closes
  • timeout-val if no value becomes available in timeout ms
  • an error if the stream errored (i.e. an error occurred during some upstream operation)

NOTE take! would ideally not return chunks, but it curently does... there is not currently a good way of using a consumer/ChunkConsumer in the public API, since i don't want to wrap the underlying stream or channel in something else

take a value from a stream.

- `source` - a stream
- `default-val` - optional value to return if the stream closes
- `timeout` - optional timeout in ms
- `timeout-val` - optional value to return if the stream times out

returns `Promise<value|error>` which evantually resolves to:
- a value when one becomes available
- `nil` or `default-val` if the stream closes
- `timeout-val` if no value becomes available in `timeout` ms
- an error if the stream errored (i.e. an error occurred
  during some upstream operation)

NOTE `take!` would ideally not return chunks, but it curently does...
there is not currently a good way of using a consumer/ChunkConsumer
in the public API, since i don't want to wrap the underlying stream
or channel in something else
sourceraw docstring

unwrap-platform-errorclj/s

(unwrap-platform-error x)

unwrap a platform error to get at the cause

unwrap a platform error to get at the cause
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close