Main channel API implementation for csp-clj.
This namespace provides the primary functions for working with channels, including:
Key Concepts for New Developers:
Most users should use csp-clj.core instead of this namespace directly, as core provides convenient aliases.
Main channel API implementation for csp-clj. This namespace provides the primary functions for working with channels, including: - Channel creation (buffered and unbuffered) - Core operations (put!, take!, close!) - Select operations (select!) - Utilities (into-chan!) - Mult/pub-sub patterns - Pipeline processing Key Concepts for New Developers: - create: Factory for buffered and unbuffered channels - put!/take!: Blocking operations on virtual threads - select!: Choose between multiple pending operations - multiplex/pub!: Fan-out patterns - pipeline: Parallel transducer processing Most users should use csp-clj.core instead of this namespace directly, as core provides convenient aliases.
(close! ch)Closes a channel.
Idempotent operation - multiple calls have no additional effect. After closing:
Parameters:
Returns: nil
Example: (close! ch) => nil
See also: closed?
Closes a channel. Idempotent operation - multiple calls have no additional effect. After closing: - Pending takes receive nil (EOF) - Pending puts receive false - No further values can be put onto the channel Parameters: - ch: the channel to close Returns: nil Example: (close! ch) => nil See also: closed?
(closed? ch)Returns true if the channel is closed.
Examples: (closed? ch) ; true or false
Returns true if the channel is closed. Examples: (closed? ch) ; true or false
(create)(create capacity-or-buffer)Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous rendezvous semantics. With a capacity or buffer, creates a buffered channel of that size.
Parameters:
Returns: A channel implementing csp-clj.protocols.channel/Channel
Example: (create) ; unbuffered (create 10) ; buffered with capacity 10 (create (csp-clj.buffers/fixed-buffer 5))
See also: csp-clj.core/channel, csp-clj.channels.unbuffered/create, csp-clj.channels.buffered/create
Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous
rendezvous semantics. With a capacity or buffer, creates a buffered
channel of that size.
Parameters:
- capacity-or-buffer: positive integer for buffer size, or a
Buffer instance (optional)
Returns:
A channel implementing csp-clj.protocols.channel/Channel
Example:
(create) ; unbuffered
(create 10) ; buffered with capacity 10
(create (csp-clj.buffers/fixed-buffer 5))
See also: csp-clj.core/channel,
csp-clj.channels.unbuffered/create,
csp-clj.channels.buffered/create(into-chan! coll)(into-chan! ch coll)(into-chan! ch coll close?)Puts all items from coll into the channel. If no channel is provided, creates a buffered channel with capacity (max 1 (count coll)).
The putting happens in a background virtual thread. Returns the channel immediately.
If close? is true (the default), the channel will be closed after all items are put.
Puts all items from coll into the channel. If no channel is provided, creates a buffered channel with capacity (max 1 (count coll)). The putting happens in a background virtual thread. Returns the channel immediately. If close? is true (the default), the channel will be closed after all items are put.
(multiplex source-ch)Creates and returns a mult(iplexer) for the given source channel. A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels).
If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure).
If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true.
Creates and returns a mult(iplexer) for the given source channel. A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels). If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure). If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true.
(pipeline n to xf from)(pipeline n to xf from opts)Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n.
Because it is parallel, the transducer will be applied independently to each element. Outputs will be returned in order relative to the inputs.
Options:
Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n. Because it is parallel, the transducer will be applied independently to each element. Outputs will be returned in order relative to the inputs. Options: - :close? (default true) - Close the to channel when from channel closes - :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads) - :ex-handler - Function to handle exceptions during transduction
(pub! source-ch topic-fn)(pub! source-ch topic-fn buf-fn)Creates and returns a pub(lisher) for the given source channel. A pub runs a background virtual thread that continually reads from the source channel and distributes each value to all channels subscribed to the value's topic.
The topic is determined by calling topic-fn on the value.
buf-fn is an optional function that takes a topic and returns a buffer capacity. If not provided, unbuffered channels are used internally.
Creates and returns a pub(lisher) for the given source channel. A pub runs a background virtual thread that continually reads from the source channel and distributes each value to all channels subscribed to the value's topic. The topic is determined by calling topic-fn on the value. buf-fn is an optional function that takes a topic and returns a buffer capacity. If not provided, unbuffered channels are used internally.
(put! ch value)(put! ch value timeout-ms)Put a value onto a channel. Blocks until:
Returns:
Two-arity form blocks indefinitely. Three-arity form times out after specified milliseconds.
If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return false.
Examples: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1000ms
Put a value onto a channel. Blocks until: - Unbuffered: a consumer takes the value - Buffered: space is available in the buffer Returns: - true: value was successfully transferred - false: channel is closed (or thread interrupted) - :timeout: timeout was specified and elapsed Two-arity form blocks indefinitely. Three-arity form times out after specified milliseconds. If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return false. Examples: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1000ms
(select! operations)(select! operations {:keys [timeout]})Complete one of several channel operations.
operations is a vector of:
Returns a vector [channel op value]:
Options:
Returns [nil :other :timeout] if timeout elapses. Returns [nil :other :interrupted] if thread is interrupted. Returns [nil :other :shutdown] if ALL channels are closed.
Complete one of several channel operations. operations is a vector of: - [channel :take] - attempt to take a value - [channel :put value] - attempt to put a value Returns a vector [channel op value]: - [ch :take val] on successful take - [ch :take nil] if taking from a closed channel - [ch :put true] on successful put - [ch :put false] if putting to a closed channel Options: - :timeout - maximum milliseconds to wait Returns [nil :other :timeout] if timeout elapses. Returns [nil :other :interrupted] if thread is interrupted. Returns [nil :other :shutdown] if ALL channels are closed.
(sub! p topic ch)(sub! p topic ch close?)Subscribes a channel to a topic of a pub(lisher). When the pub's source channel receives a value whose topic matches this topic, it will be put! onto the subscribed channel.
If close? is true (the default), closing the pub's source channel will also close the subscribed channel.
Subscribes a channel to a topic of a pub(lisher). When the pub's source channel receives a value whose topic matches this topic, it will be put! onto the subscribed channel. If close? is true (the default), closing the pub's source channel will also close the subscribed channel.
(take! ch)(take! ch timeout-ms)Take a value from a channel. Blocks until a value is available.
Returns:
One-arity form blocks indefinitely. Two-arity form times out after specified milliseconds.
If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return nil.
Examples: (take! ch) ; blocks indefinitely (take! ch 1000) ; times out after 1000ms
Take a value from a channel. Blocks until a value is available. Returns: - value: the taken value - nil: channel is closed and empty (EOF), or thread was interrupted - :timeout: timeout was specified and elapsed One-arity form blocks indefinitely. Two-arity form times out after specified milliseconds. If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return nil. Examples: (take! ch) ; blocks indefinitely (take! ch 1000) ; times out after 1000ms
(tap! mult ch)(tap! mult ch close?)Registers a tap channel on a mult. When the source channel of the mult receives a value, it will be put! onto the tap channel.
If close? is true (the default), closing the source channel will also close the tap channel.
Registers a tap channel on a mult. When the source channel of the mult receives a value, it will be put! onto the tap channel. If close? is true (the default), closing the source channel will also close the tap channel.
(unsub! p topic ch)Unsubscribes a channel from a topic of a pub(lisher).
Unsubscribes a channel from a topic of a pub(lisher).
(unsub-all! p)(unsub-all! p topic)Unsubscribes all channels from a pub(lisher), or from a specific topic.
Unsubscribes all channels from a pub(lisher), or from a specific topic.
(untap! mult ch)Removes a tap channel from a mult.
Removes a tap channel from a mult.
(untap-all! mult)Removes all tap channels from a mult.
Removes all tap channels from a mult.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |