Liking cljdoc? Tell your friends :D

manifold.stream

Methods for creating, transforming, and interacting with asynchronous streams of values.

Methods for creating, transforming, and interacting with asynchronous streams of values.
raw docstring

->sinkclj

(->sink x)
(->sink x default-val)

Converts, if possible, the object to a Manifold sink, or default-val if it cannot. If no default value is given, an IllegalArgumentException is thrown.

Converts, if possible, the object to a Manifold sink, or `default-val` if it cannot.  If no
default value is given, an `IllegalArgumentException` is thrown.
raw docstring

->sourceclj

(->source x)
(->source x default-val)

Converts, if possible, the object to a Manifold source, or default-val if it cannot. If no default value is given, an IllegalArgumentException is thrown.

Converts, if possible, the object to a Manifold source, or `default-val` if it cannot.  If no
default value is given, an `IllegalArgumentException` is thrown.
raw docstring

batchclj

(batch batch-size s)
(batch max-size max-latency s)
(batch metric max-size max-latency s)

Batches messages, either into groups of fixed size, or according to upper bounds on size and latency, in milliseconds. By default, each message is of size 1, but a custom metric function that returns the size of each message may be defined.

Batches messages, either into groups of fixed size, or according to upper bounds on size and
latency, in milliseconds.  By default, each message is of size `1`, but a custom `metric` function that
returns the size of each message may be defined.
raw docstring

bufferclj

(buffer limit s)
(buffer metric limit s)

Takes a stream, and returns a stream which is a buffered view of that stream. The buffer size may either be measured in messages, or if a metric is defined, by the sum of metric mapped over all messages currently buffered.

Takes a stream, and returns a stream which is a buffered view of that stream.  The buffer
size may either be measured in messages, or if a `metric` is defined, by the sum of `metric`
mapped over all messages currently buffered.
raw docstring

buffered-streamclj

(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)

A stream which will buffer at most limit data, where the size of each message is defined by (metric message).

description is a fn that takes the existing description map and returns a new one.

A stream which will buffer at most `limit` data, where the size of each message
is defined by `(metric message)`.

`description` is a fn that takes the existing description map and returns a new one.
raw docstring

close!clj

(close! sink)

Closes a sink, so you can't put! any more messages onto it.

Note that if it's also a source, take!s will still work until the source is drained.

Closes a sink, so you can't `put!` any more messages onto it.

Note that if it's also a source, `take!`s will still work until the source is drained.
raw docstring

closed?clj

(closed? sink)

Returns true if the event sink is closed.

Returns true if the event sink is closed.
raw docstring

concatclj

(concat s)

Takes a stream of streams, and flattens it into a single stream.

Takes a stream of streams, and flattens it into a single stream.
raw docstring

connectclj

(connect source sink)
(connect source
         sink
         {:keys [upstream? downstream? timeout description]
          :or {upstream? false downstream? true}})

Connects a source to a sink, propagating all messages from the former into the latter.

Optionally takes a map of parameters:

|:---|:--- | upstream? | Whether closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to false. Note that if the sink is the only thing downstream of the source, the source will eventually be closed, unless it is permanent. | downstream? | Whether closing the source will close the sink. Defaults to true. | timeout | If defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed-up sink from blocking all the others. | description | Describes the connection, useful for traversing the stream topology via downstream.

Connects a source to a sink, propagating all messages from the former into the latter.

Optionally takes a map of parameters:

|:---|:---
| `upstream?` | Whether closing the sink should always close the source, even if there are other sinks downstream of the source.  Defaults to `false`.  Note that if the sink is the only thing downstream of the source, the source will eventually be closed, unless it is permanent.
| `downstream?` | Whether closing the source will close the sink.  Defaults to `true`.
| `timeout` | If defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it.  Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed-up sink from blocking all the others.
| `description` | Describes the connection, useful for traversing the stream topology via `downstream`.
raw docstring

connect-viaclj

(connect-via src callback dst)
(connect-via src callback dst options)

Feeds all messages from src into callback, with the understanding that they will eventually be propagated into dst in some form. The return value of callback should be a deferred yielding either true or false. When false, the downstream sink is assumed to be closed, and the connection is severed.

Returns a deferred which yields true when src is exhausted or callback yields false.

Feeds all messages from `src` into `callback`, with the understanding that they will
eventually be propagated into `dst` in some form.  The return value of `callback`
should be a deferred yielding either `true` or `false`. When `false`,  the downstream
sink is assumed to be closed, and the connection is severed.

Returns a deferred which yields `true` when `src` is exhausted or `callback` yields `false`.
raw docstring

consumeclj

(consume callback source)

Feeds all messages from source into callback.

Messages will be processed as quickly as the callback can be executed. Returns a deferred which yields true when source is exhausted.

Feeds all messages from `source` into `callback`.

Messages will be processed as quickly as the callback can be executed. Returns
a deferred which yields `true` when `source` is exhausted.
raw docstring

consume-asyncclj

(consume-async callback source)

Feeds all messages from source into callback, which must return a deferred yielding true or false. If the returned value yields false, the consumption will be cancelled.

Messages will be processed only as quickly as the deferred values are realized. Returns a deferred which yields true when source is exhausted or callback yields false.

Feeds all messages from `source` into `callback`, which must return a deferred yielding
`true` or `false`.  If the returned value yields `false`, the consumption will be cancelled.

Messages will be processed only as quickly as the deferred values are realized. Returns a
deferred which yields `true` when `source` is exhausted or `callback` yields `false`.
raw docstring

descriptionclj

(description x)

Returns a description of the stream.

Returns a description of the stream.
raw docstring

downstreamclj

(downstream x)

Returns all sinks downstream of the given source as a sequence of 2-tuples, with the first element containing the connection's description, and the second element containing the sink.

Returns all sinks downstream of the given source as a sequence of 2-tuples, with the
first element containing the connection's description, and the second element containing
the sink.
raw docstring

drain-intoclj

(drain-into src dst)

Takes all messages from src and puts them into dst, and returns a deferred that yields true once src is drained or dst is closed. If src is closed or drained, dst will not be closed.

Takes all messages from `src` and puts them into `dst`, and returns a deferred that
yields `true` once `src` is drained or `dst` is closed.  If `src` is closed or drained,
`dst` will not be closed.
raw docstring

drained?clj

(drained? source)

Returns true if the event source is drained.

Returns true if the event source is drained.
raw docstring

dropping-streamclj

(dropping-stream n)
(dropping-stream n source)

Creates a new stream with a buffer of size n, which will drop incoming items when full.

If source is supplied, inserts a dropping stream after source with the provided capacity.

Creates a new stream with a buffer of size `n`, which will drop
incoming items when full.

If `source` is supplied, inserts a dropping stream after `source`
with the provided capacity.
raw docstring

filterclj

(filter pred s)

Equivalent to Clojure's filter, but for streams instead of sequences.

Equivalent to Clojure's `filter`, but for streams instead of sequences.
raw docstring

lazily-partition-byclj

(lazily-partition-by f s)

Equivalent to Clojure's partition-by, but returns a stream of streams. This means that if a sub-stream is not completely consumed, the next sub-stream will never be emitted.

Use with caution. If you're not totally sure you want a stream of streams, use (transform (partition-by f)) instead.

Equivalent to Clojure's `partition-by`, but returns a stream of streams.  This means that
if a sub-stream is not completely consumed, the next sub-stream will never be emitted.

Use with caution.  If you're not totally sure you want a stream of streams, use
`(transform (partition-by f))` instead.
raw docstring

mapclj

(map f s)
(map f s & rest)

Equivalent to Clojure's map, but for streams instead of sequences.

Equivalent to Clojure's `map`, but for streams instead of sequences.
raw docstring

mapcatclj

(mapcat f s)
(mapcat f s & rest)

Equivalent to Clojure's mapcat, but for streams instead of sequences.

Note that just like clojure.core/mapcat, the provided function f must return a collection and not a stream.

Equivalent to Clojure's `mapcat`, but for streams instead of sequences.

Note that just like `clojure.core/mapcat`, the provided function `f`
must return a collection and not a stream.
raw docstring

on-closedclj

(on-closed sink callback)

Registers a no-arg callback which is invoked when the sink is closed.

Registers a no-arg callback which is invoked when the sink is closed.
raw docstring

on-drainedclj

(on-drained source callback)

Registers a no-arg callback which is invoked when the source is drained.

Registers a no-arg callback which is invoked when the source is drained.
raw docstring

ontoclj

(onto executor s)

Returns an identical stream whose deferred callbacks will be executed on executor.

Returns an identical stream whose deferred callbacks will be executed
on `executor`.
raw docstring

periodicallyclj

(periodically period f)
(periodically period initial-delay f)

Creates a stream which emits the result of invoking (f) every period milliseconds.

Creates a stream which emits the result of invoking `(f)` every `period` milliseconds.
raw docstring

put!clj

(put! sink x)

Puts a value into a sink, returning a deferred that yields true if it succeeds, and false if it fails. Guaranteed to be non-blocking.

Puts a value into a sink, returning a deferred that yields `true` if it succeeds,
and `false` if it fails.  Guaranteed to be non-blocking.
raw docstring

put-all!clj

(put-all! sink msgs)

Puts all values into the sink, returning a deferred that yields true if all puts are successful, or false otherwise. If the sink provides backpressure, will pause. Guaranteed to be non-blocking.

Puts all values into the sink, returning a deferred that yields `true` if all puts
are successful, or `false` otherwise.  If the sink provides backpressure, will
pause. Guaranteed to be non-blocking.
raw docstring

realize-eachclj

(realize-each s)

Takes a stream of potentially deferred values, and returns a stream of realized values.

Takes a stream of potentially deferred values, and returns a stream of realized values.
raw docstring

reduceclj

(reduce f s)
(reduce f initial-value s)

Equivalent to Clojure's reduce, but returns a deferred representing the return value.

The deferred will be realized once the stream is closed or if the accumulator functions returns a reduced value.

Equivalent to Clojure's `reduce`, but returns a deferred representing the return value.

The deferred will be realized once the stream is closed or if the accumulator
functions returns a `reduced` value.
raw docstring

reductionsclj

(reductions f s)
(reductions f initial-value s)

Equivalent to Clojure's reductions, but for streams instead of sequences.

Equivalent to Clojure's `reductions`, but for streams instead of sequences.
raw docstring

sink-onlyclj

(sink-only s)

Returns a view of the stream which is only a sink.

Returns a view of the stream which is only a sink.
raw docstring

sink?clj

(sink? x)

Returns true if the object is a Manifold sink.

Returns true if the object is a Manifold sink.
raw docstring

sinkable?clj

(sinkable? x)

sliding-streamclj

(sliding-stream n)
(sliding-stream n source)

Creates a new stream with a buffer of size n, which will drop the oldest items when full to make room for new items.

If source is supplied, inserts a sliding stream after source with the provided capacity.

Creates a new stream with a buffer of size `n`, which will drop
the oldest items when full to make room for new items.

If `source` is supplied, inserts a sliding stream after `source`
with the provided capacity.
raw docstring

source-onlyclj

(source-only s)

Returns a view of the stream which is only a source.

Returns a view of the stream which is only a source.
raw docstring

source?clj

(source? x)

Returns true if the object is a Manifold source.

Returns true if the object is a Manifold source.
raw docstring

sourceable?clj

(sourceable? x)

spliceclj

(splice sink source)

Splices together two halves of a stream, such that all messages enqueued via put! go into sink, and all messages dequeued via take! come from source.

Splices together two halves of a stream, such that all messages enqueued via `put!` go
into `sink`, and all messages dequeued via `take!` come from `source`.
raw docstring

streamclj

(stream)
(stream buffer-size)
(stream buffer-size xform)
(stream buffer-size xform executor)

Returns a Manifold stream with a configurable buffer-size. If a capacity is specified, put! will yield true when the message is in the buffer. Otherwise, it will only yield true once it has been consumed.

xform is an optional transducer, which will transform all messages that are enqueued via put! before they are dequeued via take!.

executor, if defined, specifies which java.util.concurrent.Executor will be used to handle the deferreds returned by put! and take!.

Returns a Manifold stream with a configurable `buffer-size`.  If a capacity is specified,
`put!` will yield `true` when the message is in the buffer.  Otherwise, it will only yield
`true` once it has been consumed.

`xform` is an optional transducer, which will transform all messages that are enqueued
via `put!` before they are dequeued via `take!`.

`executor`, if defined, specifies which java.util.concurrent.Executor will be used to
handle the deferreds returned by `put!` and `take!`.
raw docstring

stream*clj

(stream* {:keys [permanent? buffer-size description executor xform]})

An alternate way to build a stream, via a map of parameters.

|:---|:--- | permanent? | if true, the channel cannot be closed | buffer-size | the number of messages that can accumulate in the channel before backpressure is applied | description | the description of the channel, which is a single arg function that takes the base properties and returns an enriched map. | executor | the java.util.concurrent.Executor that will execute all callbacks registered on the deferreds returns by put! and take! | xform | a transducer which will transform all messages that are enqueued via put! before they are dequeued via take!.

An alternate way to build a stream, via a map of parameters.

|:---|:---
| `permanent?` | if `true`, the channel cannot be closed
| `buffer-size` | the number of messages that can accumulate in the channel before backpressure is applied
| `description` | the description of the channel, which is a single arg function that takes the base properties and returns an enriched map.
| `executor` | the `java.util.concurrent.Executor` that will execute all callbacks registered on the deferreds returns by `put!` and `take!`
| `xform` | a transducer which will transform all messages that are enqueued via `put!` before they are dequeued via `take!`.
raw docstring

stream->seqclj

(stream->seq s)
(stream->seq s timeout-interval)

Transforms a stream into a lazy sequence. If a timeout-interval is defined, the sequence will terminate if timeout-interval milliseconds elapses without a new event.

Transforms a stream into a lazy sequence.  If a `timeout-interval` is defined, the sequence
will terminate if `timeout-interval` milliseconds elapses without a new event.
raw docstring

stream?clj

(stream? x)

Returns true if the object is a Manifold stream.

Returns true if the object is a Manifold stream.
raw docstring

synchronous?clj

(synchronous? x)

Returns true if the underlying abstraction behaves synchronously, using thread blocking to provide backpressure.

Returns true if the underlying abstraction behaves synchronously, using thread blocking
to provide backpressure.
raw docstring

take!clj

(take! source)
(take! source default-val)

Takes a value from a stream, returning a deferred that yields the value when it is available, or nil if the take fails. Guaranteed to be non-blocking.

A special default-val may be specified, if it is important to differentiate between actual nil values and failures.

Takes a value from a stream, returning a deferred that yields the value when it
is available, or `nil` if the take fails.  Guaranteed to be non-blocking.

A special `default-val` may be specified, if it is important to differentiate
between actual `nil` values and failures.
raw docstring

throttleclj

(throttle max-rate s)
(throttle max-rate max-backlog s)

Limits the max-rate that messages are emitted, per second.

The max-backlog dictates how much "memory" the throttling mechanism has, or how many messages it will emit immediately after a long interval without any messages. By default, this is set to one second's worth.

Limits the `max-rate` that messages are emitted, per second.

The `max-backlog` dictates how much "memory" the throttling mechanism has, or how many
messages it will emit immediately after a long interval without any messages.  By default,
this is set to one second's worth.
raw docstring

transformclj

(transform xform s)
(transform xform buffer-size s)

Takes a transducer xform and returns a source which applies it to source s. A buffer-size may optionally be defined for the output source.

Takes a transducer `xform` and returns a source which applies it to source `s`. A buffer-size
may optionally be defined for the output source.
raw docstring

try-put!clj

(try-put! sink x timeout)
(try-put! sink x timeout timeout-val)

Puts a value into a stream if the put can successfully be completed in timeout milliseconds. Returns a promise that yields true if it succeeds, and false if it fails or times out. Guaranteed to be non-blocking.

A special timeout-val may be specified, if it is important to differentiate between failure due to timeout and other failures.

Puts a value into a stream if the put can successfully be completed in `timeout`
milliseconds.  Returns a promise that yields `true` if it succeeds, and `false`
if it fails or times out.  Guaranteed to be non-blocking.

A special `timeout-val` may be specified, if it is important to differentiate
between failure due to timeout and other failures.
raw docstring

try-take!clj

(try-take! source timeout)
(try-take! source default-val timeout timeout-val)

Takes a value from a stream, returning a deferred that yields the value if it is available within timeout milliseconds, or nil if it fails or times out. Guaranteed to be non-blocking.

Special timeout-val and default-val values may be specified, if it is important to differentiate between actual nil values and timeouts/failures.

Takes a value from a stream, returning a deferred that yields the value if it is
available within `timeout` milliseconds, or `nil` if it fails or times out.
Guaranteed to be non-blocking.

Special `timeout-val` and `default-val` values may be specified, if it is
important to differentiate between actual `nil` values and timeouts/failures.
raw docstring

weak-handleclj

(weak-handle x)

Returns a weak reference that can be used to construct topologies of streams.

Returns a weak reference that can be used to construct topologies of streams.
raw docstring

zipclj

(zip a)
(zip a & rest)

Takes n-many streams, and returns a single stream which will emit n-tuples representing a message from each stream.

Takes n-many streams, and returns a single stream which will emit n-tuples representing
a message from each stream.
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close