Liking cljdoc? Tell your friends :D

csp-clj.channels

Main channel API implementation for csp-clj.

This namespace provides the primary functions for working with channels, including:

  • Channel creation (buffered and unbuffered)
  • Core operations (put!, take!, close!)
  • Select operations (select!)
  • Utilities (into-chan!)
  • Mult/pub-sub patterns
  • Pipeline processing

Key Concepts for New Developers:

  • create: Factory for buffered and unbuffered channels
  • put!/take!: Blocking operations on virtual threads
  • select!: Choose between multiple pending operations
  • multiplex/pub!: Fan-out patterns
  • pipeline: Parallel transducer processing

Most users should use csp-clj.core instead of this namespace directly, as core provides convenient aliases.

Main channel API implementation for csp-clj.

This namespace provides the primary functions for working with
channels, including:
- Channel creation (buffered and unbuffered)
- Core operations (put!, take!, close!)
- Select operations (select!)
- Utilities (into-chan!)
- Mult/pub-sub patterns
- Pipeline processing

Key Concepts for New Developers:
- create: Factory for buffered and unbuffered channels
- put!/take!: Blocking operations on virtual threads
- select!: Choose between multiple pending operations
- multiplex/pub!: Fan-out patterns
- pipeline: Parallel transducer processing

Most users should use csp-clj.core instead of this namespace
directly, as core provides convenient aliases.
raw docstring

close!clj

(close! ch)

Closes a channel.

Idempotent operation - multiple calls have no additional effect. After closing:

  • Pending takes receive nil (EOF)
  • Pending puts receive false
  • No further values can be put onto the channel

Parameters:

  • ch: the channel to close

Returns: nil

Example: (close! ch) => nil

See also: closed?

Closes a channel.

Idempotent operation - multiple calls have no additional effect.
After closing:
- Pending takes receive nil (EOF)
- Pending puts receive false
- No further values can be put onto the channel

Parameters:
  - ch: the channel to close

Returns:
  nil

Example:
  (close! ch)
  => nil

See also: closed?
raw docstring

closed?clj

(closed? ch)

Returns true if the channel is closed.

Examples: (closed? ch) ; true or false

Returns true if the channel is closed.

Examples:
(closed? ch)  ; true or false
raw docstring

createclj

(create)
(create capacity-or-buffer)

Creates a new channel.

Without arguments, creates an unbuffered channel with synchronous rendezvous semantics. With a capacity or buffer, creates a buffered channel of that size.

Parameters:

  • capacity-or-buffer: positive integer for buffer size, or a Buffer instance (optional)

Returns: A channel implementing csp-clj.protocols.channel/Channel

Example: (create) ; unbuffered (create 10) ; buffered with capacity 10 (create (csp-clj.buffers/fixed-buffer 5))

See also: csp-clj.core/channel, csp-clj.channels.unbuffered/create, csp-clj.channels.buffered/create

Creates a new channel.

Without arguments, creates an unbuffered channel with synchronous
rendezvous semantics. With a capacity or buffer, creates a buffered
channel of that size.

Parameters:
  - capacity-or-buffer: positive integer for buffer size, or a
    Buffer instance (optional)

Returns:
  A channel implementing csp-clj.protocols.channel/Channel

Example:
  (create)        ; unbuffered
  (create 10)     ; buffered with capacity 10
  (create (csp-clj.buffers/fixed-buffer 5))

See also: csp-clj.core/channel,
          csp-clj.channels.unbuffered/create,
          csp-clj.channels.buffered/create
raw docstring

into-chan!clj

(into-chan! coll)
(into-chan! ch coll)
(into-chan! ch coll close?)

Puts all items from coll into the channel. If no channel is provided, creates a buffered channel with capacity (max 1 (count coll)).

The putting happens in a background virtual thread. Returns the channel immediately.

If close? is true (the default), the channel will be closed after all items are put.

Puts all items from coll into the channel. If no channel is provided,
creates a buffered channel with capacity (max 1 (count coll)).

The putting happens in a background virtual thread.
Returns the channel immediately.

If close? is true (the default), the channel will be closed after
all items are put.
raw docstring

multiplexclj

(multiplex source-ch)

Creates and returns a mult(iplexer) for the given source channel. A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels).

If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure).

If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true.

Creates and returns a mult(iplexer) for the given source channel.
A mult runs a background virtual thread that continually reads from
the source channel and distributes each value concurrently to all
registered taps (channels).

If the source channel blocks, the mult thread blocks.
If a tap channel blocks (buffer full or unbuffered with no taker),
the mult blocks from taking the next value from the source channel
until all taps have accepted the current value (backpressure).

If the source channel is closed, the mult thread exits and will
close all taps that were registered with close? = true.
raw docstring

pipelineclj

(pipeline n to xf from)
(pipeline n to xf from opts)

Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n.

Because it is parallel, the transducer will be applied independently to each element. Outputs will be returned in order relative to the inputs.

Options:

  • :close? (default true) - Close the to channel when from channel closes
  • :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads)
  • :ex-handler - Function to handle exceptions during transduction
Takes elements from the from channel and supplies them to the to
channel, subject to the transducer xf, with parallelism n.

Because it is parallel, the transducer will be applied independently
to each element. Outputs will be returned in order relative to the inputs.

Options:
- :close? (default true) - Close the to channel when from channel closes
- :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads)
- :ex-handler - Function to handle exceptions during transduction
raw docstring

pub!clj

(pub! source-ch topic-fn)
(pub! source-ch topic-fn buf-fn)

Creates and returns a pub(lisher) for the given source channel. A pub runs a background virtual thread that continually reads from the source channel and distributes each value to all channels subscribed to the value's topic.

The topic is determined by calling topic-fn on the value.

buf-fn is an optional function that takes a topic and returns a buffer capacity. If not provided, unbuffered channels are used internally.

Creates and returns a pub(lisher) for the given source channel.
A pub runs a background virtual thread that continually reads from
the source channel and distributes each value to all channels
subscribed to the value's topic.

The topic is determined by calling topic-fn on the value.

buf-fn is an optional function that takes a topic and returns a
buffer capacity. If not provided, unbuffered channels are used
internally.
raw docstring

put!clj

(put! ch value)
(put! ch value timeout-ms)

Put a value onto a channel. Blocks until:

  • Unbuffered: a consumer takes the value
  • Buffered: space is available in the buffer

Returns:

  • true: value was successfully transferred
  • false: channel is closed (or thread interrupted)
  • :timeout: timeout was specified and elapsed

Two-arity form blocks indefinitely. Three-arity form times out after specified milliseconds.

If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return false.

Examples: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1000ms

Put a value onto a channel. Blocks until:
- Unbuffered: a consumer takes the value
- Buffered: space is available in the buffer

Returns:
- true: value was successfully transferred
- false: channel is closed (or thread interrupted)
- :timeout: timeout was specified and elapsed

Two-arity form blocks indefinitely.
Three-arity form times out after specified milliseconds.

If the calling thread is interrupted during the operation, the thread's
interrupt flag will be set and the operation will return false.

Examples:
(put! ch :value)           ; blocks indefinitely
(put! ch :value 1000)      ; times out after 1000ms
raw docstring

select!clj

(select! operations)
(select! operations {:keys [timeout]})

Complete one of several channel operations.

operations is a vector of:

  • [channel :take] - attempt to take a value
  • [channel :put value] - attempt to put a value

Returns a vector [channel op value]:

  • [ch :take val] on successful take
  • [ch :take nil] if taking from a closed channel
  • [ch :put true] on successful put
  • [ch :put false] if putting to a closed channel

Options:

  • :timeout - maximum milliseconds to wait

Returns [nil :other :timeout] if timeout elapses. Returns [nil :other :interrupted] if thread is interrupted. Returns [nil :other :shutdown] if ALL channels are closed.

Complete one of several channel operations.

operations is a vector of:
- [channel :take] - attempt to take a value
- [channel :put value] - attempt to put a value

Returns a vector [channel op value]:
- [ch :take val] on successful take
- [ch :take nil] if taking from a closed channel
- [ch :put true] on successful put
- [ch :put false] if putting to a closed channel

Options:
- :timeout - maximum milliseconds to wait

Returns [nil :other :timeout] if timeout elapses.
Returns [nil :other :interrupted] if thread is interrupted.
Returns [nil :other :shutdown] if ALL channels are closed.
raw docstring

sub!clj

(sub! p topic ch)
(sub! p topic ch close?)

Subscribes a channel to a topic of a pub(lisher). When the pub's source channel receives a value whose topic matches this topic, it will be put! onto the subscribed channel.

If close? is true (the default), closing the pub's source channel will also close the subscribed channel.

Subscribes a channel to a topic of a pub(lisher).
When the pub's source channel receives a value whose topic matches
this topic, it will be put! onto the subscribed channel.

If close? is true (the default), closing the pub's source channel will
also close the subscribed channel.
raw docstring

take!clj

(take! ch)
(take! ch timeout-ms)

Take a value from a channel. Blocks until a value is available.

Returns:

  • value: the taken value
  • nil: channel is closed and empty (EOF), or thread was interrupted
  • :timeout: timeout was specified and elapsed

One-arity form blocks indefinitely. Two-arity form times out after specified milliseconds.

If the calling thread is interrupted during the operation, the thread's interrupt flag will be set and the operation will return nil.

Examples: (take! ch) ; blocks indefinitely (take! ch 1000) ; times out after 1000ms

Take a value from a channel. Blocks until a value is available.

Returns:
- value: the taken value
- nil: channel is closed and empty (EOF), or thread was interrupted
- :timeout: timeout was specified and elapsed

One-arity form blocks indefinitely.
Two-arity form times out after specified milliseconds.

If the calling thread is interrupted during the operation, the thread's
interrupt flag will be set and the operation will return nil.

Examples:
(take! ch)           ; blocks indefinitely
(take! ch 1000)      ; times out after 1000ms
raw docstring

tap!clj

(tap! mult ch)
(tap! mult ch close?)

Registers a tap channel on a mult. When the source channel of the mult receives a value, it will be put! onto the tap channel.

If close? is true (the default), closing the source channel will also close the tap channel.

Registers a tap channel on a mult.
When the source channel of the mult receives a value, it will be
put! onto the tap channel.

If close? is true (the default), closing the source channel will
also close the tap channel.
raw docstring

unsub!clj

(unsub! p topic ch)

Unsubscribes a channel from a topic of a pub(lisher).

Unsubscribes a channel from a topic of a pub(lisher).
raw docstring

unsub-all!clj

(unsub-all! p)
(unsub-all! p topic)

Unsubscribes all channels from a pub(lisher), or from a specific topic.

Unsubscribes all channels from a pub(lisher), or from a specific topic.
raw docstring

untap!clj

(untap! mult ch)

Removes a tap channel from a mult.

Removes a tap channel from a mult.
raw docstring

untap-all!clj

(untap-all! mult)

Removes all tap channels from a mult.

Removes all tap channels from a mult.
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close