Multiplexer implementation for broadcasting channel values.
Provides a mechanism to distribute values from a single source channel to multiple tap channels concurrently.
Key Concepts for New Developers:
Algorithm Overview:
Called by: csp-clj.channels/multiplex, csp-clj.core/multiplex
Multiplexer implementation for broadcasting channel values. Provides a mechanism to distribute values from a single source channel to multiple tap channels concurrently. Key Concepts for New Developers: - Source channel: The single input channel being read - Tap channels: Multiple output channels receiving all values - Backpressure: Applied via Phaser - mult waits for all taps - Concurrency: Each tap dispatch runs in its own virtual thread Algorithm Overview: 1. Virtual thread continuously takes from source 2. For each value, snapshots current taps (TOCTOU-safe) 3. Each tap gets its own virtual thread via ExecutorService 4. Phaser synchronizes completion - applies backpressure 5. Failed/closed taps are automatically removed Called by: csp-clj.channels/multiplex, csp-clj.core/multiplex
(create source-ch)Creates and returns a mult(iplexer) for the given source channel.
A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels).
BACKPRESSURE BEHAVIOR
If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure).
This ensures that the slowest tap controls the throughput - useful for load balancing and preventing memory overflow.
LIFECYCLE MANAGEMENT
If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true.
THREAD SAFETY
All operations (tap!, untap!, untap-all!) are thread-safe and may be called from any virtual thread. The taps map uses ConcurrentHashMap for lock-free concurrent access.
Parameters:
Returns: A Multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer
Example: (def m (create ch)) (tap! m tap-ch1 true) ; tap-ch1 closes when mult closes (tap! m tap-ch2 false) ; tap-ch2 stays open when mult closes
See also:
Creates and returns a mult(iplexer) for the given source channel. A mult runs a background virtual thread that continually reads from the source channel and distributes each value concurrently to all registered taps (channels). BACKPRESSURE BEHAVIOR If the source channel blocks, the mult thread blocks. If a tap channel blocks (buffer full or unbuffered with no taker), the mult blocks from taking the next value from the source channel until all taps have accepted the current value (backpressure). This ensures that the slowest tap controls the throughput - useful for load balancing and preventing memory overflow. LIFECYCLE MANAGEMENT If the source channel is closed, the mult thread exits and will close all taps that were registered with close? = true. THREAD SAFETY All operations (tap!, untap!, untap-all!) are thread-safe and may be called from any virtual thread. The taps map uses ConcurrentHashMap for lock-free concurrent access. Parameters: - source-ch: the source channel to read from Returns: A Multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer Example: (def m (create ch)) (tap! m tap-ch1 true) ; tap-ch1 closes when mult closes (tap! m tap-ch2 false) ; tap-ch2 stays open when mult closes See also: - csp-clj.protocols.multiplexer for protocol definition - csp-clj.channels.pubsub for topic-based routing (alternative) - csp-clj.core/multiplex, csp-clj.core/tap! for convenience API
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |