Core API for csp-clj - Communicating Sequential Processes for Clojure.
A CSP implementation leveraging JDK 24+ virtual threads for blocking operations. Provides channels, select operations, pub/sub, and pipeline processing with a simple, explicit API.
Key Concepts for New Developers:
Primary exports: channel, put!, take!, close!, select!, pipeline, pub!, sub!, into-chan!
Requires JDK 24 or later for virtual thread support.
Core API for csp-clj - Communicating Sequential Processes for Clojure. A CSP implementation leveraging JDK 24+ virtual threads for blocking operations. Provides channels, select operations, pub/sub, and pipeline processing with a simple, explicit API. Key Concepts for New Developers: - Channels: Buffered and unbuffered queues for communication - Blocking: Operations block the current virtual thread (cheap in JDK 24+) - Select: Choose between multiple channel operations - Pub/Sub: Topic-based message distribution - Pipeline: Parallel transducer processing Primary exports: channel, put!, take!, close!, select!, pipeline, pub!, sub!, into-chan! Requires JDK 24 or later for virtual thread support.
Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous rendezvous semantics. With a capacity argument, creates a buffered channel of that size.
Parameters:
Returns: A channel implementing csp-clj.protocols.channel/Channel
Example: (channel) ; unbuffered (channel 10) ; buffered with capacity 10
See also: csp-clj.channels/create, csp-clj.channels.unbuffered/create, csp-clj.channels.buffered/create
Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous
rendezvous semantics. With a capacity argument, creates a buffered
channel of that size.
Parameters:
- capacity: positive integer for buffer size (optional)
Returns:
A channel implementing csp-clj.protocols.channel/Channel
Example:
(channel) ; unbuffered
(channel 10) ; buffered with capacity 10
See also: csp-clj.channels/create,
csp-clj.channels.unbuffered/create,
csp-clj.channels.buffered/createCloses a channel.
Idempotent operation. Pending takes receive nil (EOF). Pending puts receive false. No further values can be put.
Parameters:
Returns: nil
Example: (close! ch) => nil
See also: csp-clj.channels/close!, closed?
Closes a channel. Idempotent operation. Pending takes receive nil (EOF). Pending puts receive false. No further values can be put. Parameters: - ch: the channel to close Returns: nil Example: (close! ch) => nil See also: csp-clj.channels/close!, closed?
Returns true if the channel is closed.
Parameters:
Returns: true if closed, false otherwise
Example: (closed? ch) => false
See also: csp-clj.channels/closed?, close!
Returns true if the channel is closed. Parameters: - ch: the channel Returns: true if closed, false otherwise Example: (closed? ch) => false See also: csp-clj.channels/closed?, close!
Creates a fixed-size buffer for use with channels.
Parameters:
Returns: A buffer implementing csp-clj.protocols.buffer/Buffer
Example: (fixed-buffer 10) => #csp_clj.buffers.fixed.FixedBuffer{...}
See also: csp-clj.buffers.fixed/create
Creates a fixed-size buffer for use with channels.
Parameters:
- capacity: positive integer for buffer size
Returns:
A buffer implementing csp-clj.protocols.buffer/Buffer
Example:
(fixed-buffer 10)
=> #csp_clj.buffers.fixed.FixedBuffer{...}
See also: csp-clj.buffers.fixed/createPuts all items from a collection into a channel.
The putting happens in a background virtual thread. Returns the channel immediately (non-blocking).
Parameters:
Returns: The channel
Example: (into-chan! [1 2 3]) ; creates channel, closes when done (into-chan! ch [1 2 3] false) ; uses existing channel, keeps open
See also: csp-clj.channels/into-chan!
Puts all items from a collection into a channel. The putting happens in a background virtual thread. Returns the channel immediately (non-blocking). Parameters: - coll: collection of values to put - ch: channel (optional, creates buffered channel if omitted) - close?: if true (default), closes channel when done Returns: The channel Example: (into-chan! [1 2 3]) ; creates channel, closes when done (into-chan! ch [1 2 3] false) ; uses existing channel, keeps open See also: csp-clj.channels/into-chan!
Creates a multiplexer for fan-out from a source channel.
The multiplexer runs a background virtual thread that continually reads from the source channel and distributes each value to all registered tap channels. If a tap channel blocks (buffer full or unbuffered), the mult blocks from taking the next value until all taps have accepted (backpressure).
When the source channel closes, the mult thread exits and will close all taps that were registered with close? = true.
Parameters:
Returns: A multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer
Example: (def source (channel)) (def m (multiplex source)) (def tap-ch (channel 10)) (tap! m tap-ch) => #csp_clj.channels.multiplexer.Multiplexer{...}
See also: csp-clj.channels/multiplex, tap!, untap!
Creates a multiplexer for fan-out from a source channel.
The multiplexer runs a background virtual thread that continually reads
from the source channel and distributes each value to all registered
tap channels. If a tap channel blocks (buffer full or unbuffered), the
mult blocks from taking the next value until all taps have accepted
(backpressure).
When the source channel closes, the mult thread exits and will close
all taps that were registered with close? = true.
Parameters:
- source-ch: the source channel to multiplex from
Returns:
A multiplexer implementing csp-clj.protocols.multiplexer/Multiplexer
Example:
(def source (channel))
(def m (multiplex source))
(def tap-ch (channel 10))
(tap! m tap-ch)
=> #csp_clj.channels.multiplexer.Multiplexer{...}
See also: csp-clj.channels/multiplex, tap!, untap!Processes values through a transducer with parallelism.
Takes values from 'from' channel, applies transducer xf with parallelism n, and puts results to 'to' channel. Maintains order relative to inputs.
Parameters:
Returns: nil (launches background processing)
Example: (pipeline 4 out-ch (map inc) in-ch) (pipeline 8 out-ch (filter odd?) in-ch {:close? false})
See also: csp-clj.channels/pipeline
Processes values through a transducer with parallelism.
Takes values from 'from' channel, applies transducer xf with
parallelism n, and puts results to 'to' channel. Maintains
order relative to inputs.
Parameters:
- n: parallelism level (number of concurrent operations)
- to: output channel
- xf: transducer to apply
- from: input channel
- opts: optional map with:
- :close? - close 'to' when 'from' closes (default true)
- :executor - :cpu (work-stealing) or :io (virtual threads)
- :ex-handler - function to handle exceptions
Returns:
nil (launches background processing)
Example:
(pipeline 4 out-ch (map inc) in-ch)
(pipeline 8 out-ch (filter odd?) in-ch {:close? false})
See also: csp-clj.channels/pipelineCreates a publisher for topic-based message distribution.
Reads from source channel and routes values to subscribers based on topic-fn. Each topic gets its own internal multiplexer.
Parameters:
Returns: A publisher implementing csp-clj.protocols.publisher/Publisher
Example: (def p (pub! ch :type)) ; topic is :type field (def p (pub! ch :type #(if % 10 1))) ; custom buffer per topic
See also: csp-clj.channels/pub!, sub!, unsub!
Creates a publisher for topic-based message distribution. Reads from source channel and routes values to subscribers based on topic-fn. Each topic gets its own internal multiplexer. Parameters: - source-ch: channel to read from - topic-fn: function to extract topic from value - buf-fn: optional function (topic -> capacity) for buffer sizes Returns: A publisher implementing csp-clj.protocols.publisher/Publisher Example: (def p (pub! ch :type)) ; topic is :type field (def p (pub! ch :type #(if % 10 1))) ; custom buffer per topic See also: csp-clj.channels/pub!, sub!, unsub!
Puts a value onto a channel.
Blocks the current virtual thread until:
Parameters:
Returns:
Example: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1 second => true
See also: csp-clj.channels/put!, take!
Puts a value onto a channel. Blocks the current virtual thread until: - Unbuffered: a consumer takes the value - Buffered: space is available in the buffer Parameters: - ch: the channel - value: the value to put (cannot be nil) - timeout-ms: optional timeout in milliseconds Returns: - true: value was successfully transferred - false: channel is closed - :timeout: timeout elapsed (if timeout-ms provided) Example: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1 second => true See also: csp-clj.channels/put!, take!
Completes one of several channel operations.
Takes a vector of operations and completes exactly one:
If multiple operations are ready, one is chosen pseudo-randomly for fairness.
Parameters:
Returns vector [channel op value]:
Example: (select! [[ch1 :take] [ch2 :put :value]]) => [ch1 :take :data]
See also: csp-clj.channels/select!
Completes one of several channel operations. Takes a vector of operations and completes exactly one: - [channel :take] - attempt to take a value - [channel :put value] - attempt to put a value If multiple operations are ready, one is chosen pseudo-randomly for fairness. Parameters: - operations: vector of operation vectors - opts: optional map with :timeout key (milliseconds) Returns vector [channel op value]: - [ch :take val] on successful take - [ch :take nil] if taking from closed channel - [ch :put true] on successful put - [ch :put false] if putting to closed channel - [nil :other :timeout] if timeout elapsed - [nil :other :interrupted] if thread interrupted - [nil :other :shutdown] if all channels closed Example: (select! [[ch1 :take] [ch2 :put :value]]) => [ch1 :take :data] See also: csp-clj.channels/select!
Subscribes a channel to a publisher topic.
When the publisher receives a value matching the topic, it will be put onto the subscribed channel.
Parameters:
Returns: The subscribed channel
Example: (sub! p :orders order-ch) (sub! p :events event-ch false) ; don't close with publisher
See also: csp-clj.channels/sub!, pub!, unsub!
Subscribes a channel to a publisher topic. When the publisher receives a value matching the topic, it will be put onto the subscribed channel. Parameters: - p: the publisher - topic: the topic to subscribe to - ch: the channel to receive values - close?: if true (default), closes ch when publisher closes Returns: The subscribed channel Example: (sub! p :orders order-ch) (sub! p :events event-ch false) ; don't close with publisher See also: csp-clj.channels/sub!, pub!, unsub!
Takes a value from a channel.
Blocks the current virtual thread until a value is available. Returns nil when channel is closed and empty (EOF).
Parameters:
Returns:
Example: (take! ch) ; blocks until value available (take! ch 1000) ; times out after 1 second => :value
See also: csp-clj.channels/take!, put!
Takes a value from a channel. Blocks the current virtual thread until a value is available. Returns nil when channel is closed and empty (EOF). Parameters: - ch: the channel - timeout-ms: optional timeout in milliseconds Returns: - value: the taken value - nil: channel is closed and empty, or thread interrupted - :timeout: timeout elapsed (if timeout-ms provided) Example: (take! ch) ; blocks until value available (take! ch 1000) ; times out after 1 second => :value See also: csp-clj.channels/take!, put!
Registers a tap channel on a multiplexer.
When the multiplexer's source channel receives a value, it will be put! onto all registered tap channels concurrently.
Parameters:
Returns: The tap channel
Example: (tap! m tap-ch) (tap! m tap-ch false) ; don't close with source => tap-ch
See also: csp-clj.channels/tap!, multiplex, untap!
Registers a tap channel on a multiplexer. When the multiplexer's source channel receives a value, it will be put! onto all registered tap channels concurrently. Parameters: - mult: the multiplexer - ch: the channel to receive values - close?: if true (default), closes ch when source closes Returns: The tap channel Example: (tap! m tap-ch) (tap! m tap-ch false) ; don't close with source => tap-ch See also: csp-clj.channels/tap!, multiplex, untap!
Unsubscribes a channel from a publisher topic.
Parameters:
Returns: nil
Example: (unsub! p :orders order-ch) => nil
See also: csp-clj.channels/unsub!, sub!
Unsubscribes a channel from a publisher topic. Parameters: - p: the publisher - topic: the topic to unsubscribe from - ch: the channel to remove Returns: nil Example: (unsub! p :orders order-ch) => nil See also: csp-clj.channels/unsub!, sub!
Unsubscribes all channels from a publisher.
With one argument, unsubscribes from all topics. With two arguments, unsubscribes from specific topic only.
Parameters:
Returns: nil
Example: (unsub-all! p) ; unsubscribe all from all topics (unsub-all! p :orders) ; unsubscribe all from :orders topic => nil
See also: csp-clj.channels/unsub-all!, sub!
Unsubscribes all channels from a publisher. With one argument, unsubscribes from all topics. With two arguments, unsubscribes from specific topic only. Parameters: - p: the publisher - topic: specific topic (optional) Returns: nil Example: (unsub-all! p) ; unsubscribe all from all topics (unsub-all! p :orders) ; unsubscribe all from :orders topic => nil See also: csp-clj.channels/unsub-all!, sub!
Removes a tap channel from a multiplexer.
The tap channel will no longer receive values from the multiplexer.
Parameters:
Returns: nil
Example: (untap! m tap-ch) => nil
See also: csp-clj.channels/untap!, tap!, untap-all!
Removes a tap channel from a multiplexer. The tap channel will no longer receive values from the multiplexer. Parameters: - mult: the multiplexer - ch: the tap channel to remove Returns: nil Example: (untap! m tap-ch) => nil See also: csp-clj.channels/untap!, tap!, untap-all!
Removes all tap channels from a multiplexer.
Parameters:
Returns: nil
Example: (untap-all! m) => nil
See also: csp-clj.channels/untap-all!, tap!, untap!
Removes all tap channels from a multiplexer. Parameters: - mult: the multiplexer Returns: nil Example: (untap-all! m) => nil See also: csp-clj.channels/untap-all!, tap!, untap!
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |