Methods for creating, transforming, and interacting with asynchronous streams of values.
Methods for creating, transforming, and interacting with asynchronous streams of values.
(->sink x)
(->sink x default-val)
Converts, if possible, the object to a Manifold sink, or default-val
if it cannot. If no
default value is given, an IllegalArgumentException
is thrown.
Converts, if possible, the object to a Manifold sink, or `default-val` if it cannot. If no default value is given, an `IllegalArgumentException` is thrown.
(->source x)
(->source x default-val)
Converts, if possible, the object to a Manifold source, or default-val
if it cannot. If no
default value is given, an IllegalArgumentException
is thrown.
Converts, if possible, the object to a Manifold source, or `default-val` if it cannot. If no default value is given, an `IllegalArgumentException` is thrown.
(batch batch-size s)
(batch max-size max-latency s)
(batch metric max-size max-latency s)
Batches messages, either into groups of fixed size, or according to upper bounds on size and
latency, in milliseconds. By default, each message is of size 1
, but a custom metric
function that
returns the size of each message may be defined.
Batches messages, either into groups of fixed size, or according to upper bounds on size and latency, in milliseconds. By default, each message is of size `1`, but a custom `metric` function that returns the size of each message may be defined.
(buffer limit s)
(buffer metric limit s)
Takes a stream, and returns a stream which is a buffered view of that stream. The buffer
size may either be measured in messages, or if a metric
is defined, by the sum of metric
mapped over all messages currently buffered.
Takes a stream, and returns a stream which is a buffered view of that stream. The buffer size may either be measured in messages, or if a `metric` is defined, by the sum of `metric` mapped over all messages currently buffered.
(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit
data, where the size of each message
is defined by (metric message)
.
description
is a fn that takes the existing description map and returns a new one.
A stream which will buffer at most `limit` data, where the size of each message is defined by `(metric message)`. `description` is a fn that takes the existing description map and returns a new one.
(close! sink)
Closes a sink, so you can't put!
any more messages onto it.
Note that if it's also a source, take!
s will still work until the source is drained.
Closes a sink, so you can't `put!` any more messages onto it. Note that if it's also a source, `take!`s will still work until the source is drained.
(closed? sink)
Returns true if the event sink is closed.
Returns true if the event sink is closed.
(concat s)
Takes a stream of streams, and flattens it into a single stream.
Takes a stream of streams, and flattens it into a single stream.
(connect source sink)
(connect source
sink
{:keys [upstream? downstream? timeout description]
:or {upstream? false downstream? true}})
Connects a source to a sink, propagating all messages from the former into the latter.
Optionally takes a map of parameters:
|:---|:---
| upstream?
| Whether closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to false
. Note that if the sink is the only thing downstream of the source, the source will eventually be closed, unless it is permanent.
| downstream?
| Whether closing the source will close the sink. Defaults to true
.
| timeout
| If defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed-up sink from blocking all the others.
| description
| Describes the connection, useful for traversing the stream topology via downstream
.
Connects a source to a sink, propagating all messages from the former into the latter. Optionally takes a map of parameters: |:---|:--- | `upstream?` | Whether closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to `false`. Note that if the sink is the only thing downstream of the source, the source will eventually be closed, unless it is permanent. | `downstream?` | Whether closing the source will close the sink. Defaults to `true`. | `timeout` | If defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed-up sink from blocking all the others. | `description` | Describes the connection, useful for traversing the stream topology via `downstream`.
(connect-via src callback dst)
(connect-via src callback dst options)
Feeds all messages from src
into callback
, with the understanding that they will
eventually be propagated into dst
in some form. The return value of callback
should be a deferred yielding either true
or false
. When false
, the downstream
sink is assumed to be closed, and the connection is severed.
Returns a deferred which yields true
when src
is exhausted or callback
yields false
.
Feeds all messages from `src` into `callback`, with the understanding that they will eventually be propagated into `dst` in some form. The return value of `callback` should be a deferred yielding either `true` or `false`. When `false`, the downstream sink is assumed to be closed, and the connection is severed. Returns a deferred which yields `true` when `src` is exhausted or `callback` yields `false`.
(consume callback source)
Feeds all messages from source
into callback
.
Messages will be processed as quickly as the callback can be executed. Returns
a deferred which yields true
when source
is exhausted.
Feeds all messages from `source` into `callback`. Messages will be processed as quickly as the callback can be executed. Returns a deferred which yields `true` when `source` is exhausted.
(consume-async callback source)
Feeds all messages from source
into callback
, which must return a deferred yielding
true
or false
. If the returned value yields false
, the consumption will be cancelled.
Messages will be processed only as quickly as the deferred values are realized. Returns a
deferred which yields true
when source
is exhausted or callback
yields false
.
Feeds all messages from `source` into `callback`, which must return a deferred yielding `true` or `false`. If the returned value yields `false`, the consumption will be cancelled. Messages will be processed only as quickly as the deferred values are realized. Returns a deferred which yields `true` when `source` is exhausted or `callback` yields `false`.
(description x)
Returns a description of the stream.
Returns a description of the stream.
(downstream x)
Returns all sinks downstream of the given source as a sequence of 2-tuples, with the first element containing the connection's description, and the second element containing the sink.
Returns all sinks downstream of the given source as a sequence of 2-tuples, with the first element containing the connection's description, and the second element containing the sink.
(drain-into src dst)
Takes all messages from src
and puts them into dst
, and returns a deferred that
yields true
once src
is drained or dst
is closed. If src
is closed or drained,
dst
will not be closed.
Takes all messages from `src` and puts them into `dst`, and returns a deferred that yields `true` once `src` is drained or `dst` is closed. If `src` is closed or drained, `dst` will not be closed.
(drained? source)
Returns true if the event source is drained.
Returns true if the event source is drained.
(dropping-stream n)
(dropping-stream n source)
Creates a new stream with a buffer of size n
, which will drop
incoming items when full.
If source
is supplied, inserts a dropping stream after source
with the provided capacity.
Creates a new stream with a buffer of size `n`, which will drop incoming items when full. If `source` is supplied, inserts a dropping stream after `source` with the provided capacity.
(filter pred s)
Equivalent to Clojure's filter
, but for streams instead of sequences.
Equivalent to Clojure's `filter`, but for streams instead of sequences.
(lazily-partition-by f s)
Equivalent to Clojure's partition-by
, but returns a stream of streams. This means that
if a sub-stream is not completely consumed, the next sub-stream will never be emitted.
Use with caution. If you're not totally sure you want a stream of streams, use
(transform (partition-by f))
instead.
Equivalent to Clojure's `partition-by`, but returns a stream of streams. This means that if a sub-stream is not completely consumed, the next sub-stream will never be emitted. Use with caution. If you're not totally sure you want a stream of streams, use `(transform (partition-by f))` instead.
(map f s)
(map f s & rest)
Equivalent to Clojure's map
, but for streams instead of sequences.
Equivalent to Clojure's `map`, but for streams instead of sequences.
(mapcat f s)
(mapcat f s & rest)
Equivalent to Clojure's mapcat
, but for streams instead of sequences.
Note that just like clojure.core/mapcat
, the provided function f
must return a collection and not a stream.
Equivalent to Clojure's `mapcat`, but for streams instead of sequences. Note that just like `clojure.core/mapcat`, the provided function `f` must return a collection and not a stream.
(on-closed sink callback)
Registers a no-arg callback which is invoked when the sink is closed.
Registers a no-arg callback which is invoked when the sink is closed.
(on-drained source callback)
Registers a no-arg callback which is invoked when the source is drained.
Registers a no-arg callback which is invoked when the source is drained.
(onto executor s)
Returns an identical stream whose deferred callbacks will be executed
on executor
.
Returns an identical stream whose deferred callbacks will be executed on `executor`.
(periodically period f)
(periodically period initial-delay f)
Creates a stream which emits the result of invoking (f)
every period
milliseconds.
Creates a stream which emits the result of invoking `(f)` every `period` milliseconds.
(put! sink x)
Puts a value into a sink, returning a deferred that yields true
if it succeeds,
and false
if it fails. Guaranteed to be non-blocking.
Puts a value into a sink, returning a deferred that yields `true` if it succeeds, and `false` if it fails. Guaranteed to be non-blocking.
(put-all! sink msgs)
Puts all values into the sink, returning a deferred that yields true
if all puts
are successful, or false
otherwise. If the sink provides backpressure, will
pause. Guaranteed to be non-blocking.
Puts all values into the sink, returning a deferred that yields `true` if all puts are successful, or `false` otherwise. If the sink provides backpressure, will pause. Guaranteed to be non-blocking.
(realize-each s)
Takes a stream of potentially deferred values, and returns a stream of realized values.
Takes a stream of potentially deferred values, and returns a stream of realized values.
(reduce f s)
(reduce f initial-value s)
Equivalent to Clojure's reduce
, but returns a deferred representing the return value.
The deferred will be realized once the stream is closed or if the accumulator
functions returns a reduced
value.
Equivalent to Clojure's `reduce`, but returns a deferred representing the return value. The deferred will be realized once the stream is closed or if the accumulator functions returns a `reduced` value.
(reductions f s)
(reductions f initial-value s)
Equivalent to Clojure's reductions
, but for streams instead of sequences.
Equivalent to Clojure's `reductions`, but for streams instead of sequences.
(sink-only s)
Returns a view of the stream which is only a sink.
Returns a view of the stream which is only a sink.
(sink? x)
Returns true if the object is a Manifold sink.
Returns true if the object is a Manifold sink.
(sinkable? x)
(sliding-stream n)
(sliding-stream n source)
Creates a new stream with a buffer of size n
, which will drop
the oldest items when full to make room for new items.
If source
is supplied, inserts a sliding stream after source
with the provided capacity.
Creates a new stream with a buffer of size `n`, which will drop the oldest items when full to make room for new items. If `source` is supplied, inserts a sliding stream after `source` with the provided capacity.
(source-only s)
Returns a view of the stream which is only a source.
Returns a view of the stream which is only a source.
(source? x)
Returns true if the object is a Manifold source.
Returns true if the object is a Manifold source.
(sourceable? x)
(splice sink source)
Splices together two halves of a stream, such that all messages enqueued via put!
go
into sink
, and all messages dequeued via take!
come from source
.
Splices together two halves of a stream, such that all messages enqueued via `put!` go into `sink`, and all messages dequeued via `take!` come from `source`.
(stream)
(stream buffer-size)
(stream buffer-size xform)
(stream buffer-size xform executor)
Returns a Manifold stream with a configurable buffer-size
. If a capacity is specified,
put!
will yield true
when the message is in the buffer. Otherwise, it will only yield
true
once it has been consumed.
xform
is an optional transducer, which will transform all messages that are enqueued
via put!
before they are dequeued via take!
.
executor
, if defined, specifies which java.util.concurrent.Executor will be used to
handle the deferreds returned by put!
and take!
.
Returns a Manifold stream with a configurable `buffer-size`. If a capacity is specified, `put!` will yield `true` when the message is in the buffer. Otherwise, it will only yield `true` once it has been consumed. `xform` is an optional transducer, which will transform all messages that are enqueued via `put!` before they are dequeued via `take!`. `executor`, if defined, specifies which java.util.concurrent.Executor will be used to handle the deferreds returned by `put!` and `take!`.
(stream* {:keys [permanent? buffer-size description executor xform]})
An alternate way to build a stream, via a map of parameters.
|:---|:---
| permanent?
| if true
, the channel cannot be closed
| buffer-size
| the number of messages that can accumulate in the channel before backpressure is applied
| description
| the description of the channel, which is a single arg function that takes the base properties and returns an enriched map.
| executor
| the java.util.concurrent.Executor
that will execute all callbacks registered on the deferreds returns by put!
and take!
| xform
| a transducer which will transform all messages that are enqueued via put!
before they are dequeued via take!
.
An alternate way to build a stream, via a map of parameters. |:---|:--- | `permanent?` | if `true`, the channel cannot be closed | `buffer-size` | the number of messages that can accumulate in the channel before backpressure is applied | `description` | the description of the channel, which is a single arg function that takes the base properties and returns an enriched map. | `executor` | the `java.util.concurrent.Executor` that will execute all callbacks registered on the deferreds returns by `put!` and `take!` | `xform` | a transducer which will transform all messages that are enqueued via `put!` before they are dequeued via `take!`.
(stream->seq s)
(stream->seq s timeout-interval)
Transforms a stream into a lazy sequence. If a timeout-interval
is defined, the sequence
will terminate if timeout-interval
milliseconds elapses without a new event.
Transforms a stream into a lazy sequence. If a `timeout-interval` is defined, the sequence will terminate if `timeout-interval` milliseconds elapses without a new event.
(stream? x)
Returns true if the object is a Manifold stream.
Returns true if the object is a Manifold stream.
(synchronous? x)
Returns true if the underlying abstraction behaves synchronously, using thread blocking to provide backpressure.
Returns true if the underlying abstraction behaves synchronously, using thread blocking to provide backpressure.
(take! source)
(take! source default-val)
Takes a value from a stream, returning a deferred that yields the value when it
is available, or nil
if the take fails. Guaranteed to be non-blocking.
A special default-val
may be specified, if it is important to differentiate
between actual nil
values and failures.
Takes a value from a stream, returning a deferred that yields the value when it is available, or `nil` if the take fails. Guaranteed to be non-blocking. A special `default-val` may be specified, if it is important to differentiate between actual `nil` values and failures.
(throttle max-rate s)
(throttle max-rate max-backlog s)
Limits the max-rate
that messages are emitted, per second.
The max-backlog
dictates how much "memory" the throttling mechanism has, or how many
messages it will emit immediately after a long interval without any messages. By default,
this is set to one second's worth.
Limits the `max-rate` that messages are emitted, per second. The `max-backlog` dictates how much "memory" the throttling mechanism has, or how many messages it will emit immediately after a long interval without any messages. By default, this is set to one second's worth.
(transform xform s)
(transform xform buffer-size s)
Takes a transducer xform
and returns a source which applies it to source s
. A buffer-size
may optionally be defined for the output source.
Takes a transducer `xform` and returns a source which applies it to source `s`. A buffer-size may optionally be defined for the output source.
(try-put! sink x timeout)
(try-put! sink x timeout timeout-val)
Puts a value into a stream if the put can successfully be completed in timeout
milliseconds. Returns a promise that yields true
if it succeeds, and false
if it fails or times out. Guaranteed to be non-blocking.
A special timeout-val
may be specified, if it is important to differentiate
between failure due to timeout and other failures.
Puts a value into a stream if the put can successfully be completed in `timeout` milliseconds. Returns a promise that yields `true` if it succeeds, and `false` if it fails or times out. Guaranteed to be non-blocking. A special `timeout-val` may be specified, if it is important to differentiate between failure due to timeout and other failures.
(try-take! source timeout)
(try-take! source default-val timeout timeout-val)
Takes a value from a stream, returning a deferred that yields the value if it is
available within timeout
milliseconds, or nil
if it fails or times out.
Guaranteed to be non-blocking.
Special timeout-val
and default-val
values may be specified, if it is
important to differentiate between actual nil
values and timeouts/failures.
Takes a value from a stream, returning a deferred that yields the value if it is available within `timeout` milliseconds, or `nil` if it fails or times out. Guaranteed to be non-blocking. Special `timeout-val` and `default-val` values may be specified, if it is important to differentiate between actual `nil` values and timeouts/failures.
(weak-handle x)
Returns a weak reference that can be used to construct topologies of streams.
Returns a weak reference that can be used to construct topologies of streams.
(zip a)
(zip a & rest)
Takes n-many streams, and returns a single stream which will emit n-tuples representing a message from each stream.
Takes n-many streams, and returns a single stream which will emit n-tuples representing a message from each stream.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close