Liking cljdoc? Tell your friends :D
Clojure only.

csp-clj.channels.multiplexer

Multiplexer implementation for broadcasting channel values.

Provides a mechanism to distribute values from a single source channel to multiple tap channels concurrently.

Key Concepts for New Developers:

  • Source channel: The single input channel being read
  • Tap channels: Multiple output channels receiving all values
  • Backpressure: Applied via Phaser - mult waits for all taps
  • Concurrency: Each tap dispatch runs in its own virtual thread

Algorithm Overview:

  1. Virtual thread continuously takes from source
  2. For each value, snapshots current taps (TOCTOU-safe)
  3. Each tap gets its own virtual thread via ExecutorService
  4. Phaser synchronizes completion - applies backpressure
  5. Failed/closed taps are automatically removed

Called by: csp-clj.channels/multiplex, csp-clj.core/multiplex

Multiplexer implementation for broadcasting channel values.

Provides a mechanism to distribute values from a single source
channel to multiple tap channels concurrently.

Key Concepts for New Developers:
- Source channel: The single input channel being read
- Tap channels: Multiple output channels receiving all values
- Backpressure: Applied via Phaser - mult waits for all taps
- Concurrency: Each tap dispatch runs in its own virtual thread

Algorithm Overview:
1. Virtual thread continuously takes from source
2. For each value, snapshots current taps (TOCTOU-safe)
3. Each tap gets its own virtual thread via ExecutorService
4. Phaser synchronizes completion - applies backpressure
5. Failed/closed taps are automatically removed

Called by: csp-clj.channels/multiplex, csp-clj.core/multiplex
raw docstring

createclj

(create source-ch)

Creates and returns a mult(iplexer) for the given source channel.

A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels).

BACKPRESSURE BEHAVIOR

If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure).

This ensures that the slowest tap controls the throughput - useful for load balancing and preventing memory overflow.

LIFECYCLE MANAGEMENT

If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true.

THREAD SAFETY

All operations (tap!, untap!, untap-all!) are thread-safe and may be called from any virtual thread. The taps map uses ConcurrentHashMap for lock-free concurrent access.

Parameters:

  • source-ch: the source channel to read from

Returns: A Multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer

Example: (def m (create ch)) (tap! m tap-ch1 true) ; tap-ch1 closes when mult closes (tap! m tap-ch2 false) ; tap-ch2 stays open when mult closes

See also:

  • csp-clj.protocols.multiplexer for protocol definition
  • csp-clj.channels.pubsub for topic-based routing (alternative)
  • csp-clj.core/multiplex, csp-clj.core/tap! for convenience API
Creates and returns a mult(iplexer) for the given source channel.

A mult runs a background virtual thread that continually reads from
the source channel and distributes each value concurrently to all
registered taps (channels).

BACKPRESSURE BEHAVIOR

If the source channel blocks, the mult thread blocks.
If a tap channel blocks (buffer full or unbuffered with no taker),
the mult blocks from taking the next value from the source channel
until all taps have accepted the current value (backpressure).

This ensures that the slowest tap controls the throughput - useful
for load balancing and preventing memory overflow.

LIFECYCLE MANAGEMENT

If the source channel is closed, the mult thread exits and will
close all taps that were registered with close? = true.

THREAD SAFETY

All operations (tap!, untap!, untap-all!) are thread-safe and may
be called from any virtual thread. The taps map uses ConcurrentHashMap
for lock-free concurrent access.

Parameters:
  - source-ch: the source channel to read from

Returns:
  A Multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer

Example:
  (def m (create ch))
  (tap! m tap-ch1 true)   ; tap-ch1 closes when mult closes
  (tap! m tap-ch2 false)  ; tap-ch2 stays open when mult closes

See also:
  - csp-clj.protocols.multiplexer for protocol definition
  - csp-clj.channels.pubsub for topic-based routing (alternative)
  - csp-clj.core/multiplex, csp-clj.core/tap! for convenience API
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close