Core API for csp-clj - Communicating Sequential Processes for Clojure.
A CSP implementation leveraging JDK 24+ virtual threads for blocking operations. Provides channels, select operations, pub/sub, and pipeline processing with a simple, explicit API.
Key Concepts for New Developers:
Primary exports: channel, put!, take!, close!, select!, pipeline, pub!, sub!, into-chan!
Requires JDK 24 or later for virtual thread support.
Core API for csp-clj - Communicating Sequential Processes for Clojure. A CSP implementation leveraging JDK 24+ virtual threads for blocking operations. Provides channels, select operations, pub/sub, and pipeline processing with a simple, explicit API. Key Concepts for New Developers: - Channels: Buffered and unbuffered queues for communication - Blocking: Operations block the current virtual thread (cheap in JDK 24+) - Select: Choose between multiple channel operations - Pub/Sub: Topic-based message distribution - Pipeline: Parallel transducer processing Primary exports: channel, put!, take!, close!, select!, pipeline, pub!, sub!, into-chan! Requires JDK 24 or later for virtual thread support.
Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous rendezvous semantics. With a capacity argument, creates a buffered channel of that size.
Parameters:
Returns: A channel implementing csp-clj.protocols.channel/Channel
Example: (channel) ; unbuffered (channel 10) ; buffered with capacity 10
See also: csp-clj.channels/create, csp-clj.channels.unbuffered/create, csp-clj.channels.buffered/create
Creates a new channel.
Without arguments, creates an unbuffered channel with synchronous
rendezvous semantics. With a capacity argument, creates a buffered
channel of that size.
Parameters:
- capacity: positive integer for buffer size (optional)
Returns:
A channel implementing csp-clj.protocols.channel/Channel
Example:
(channel) ; unbuffered
(channel 10) ; buffered with capacity 10
See also: csp-clj.channels/create,
csp-clj.channels.unbuffered/create,
csp-clj.channels.buffered/createCloses a channel.
Idempotent operation. Pending takes receive nil (EOF). Pending puts receive false. No further values can be put.
Parameters:
Returns: nil
Example: (close! ch) => nil
See also: csp-clj.channels/close!, closed?
Closes a channel. Idempotent operation. Pending takes receive nil (EOF). Pending puts receive false. No further values can be put. Parameters: - ch: the channel to close Returns: nil Example: (close! ch) => nil See also: csp-clj.channels/close!, closed?
Returns true if the channel is closed.
Parameters:
Returns: true if closed, false otherwise
Example: (closed? ch) => false
See also: csp-clj.channels/closed?, close!
Returns true if the channel is closed. Parameters: - ch: the channel Returns: true if closed, false otherwise Example: (closed? ch) => false See also: csp-clj.channels/closed?, close!
Creates a fixed-size buffer for use with channels.
Parameters:
Returns: A buffer implementing csp-clj.protocols.buffer/Buffer
Example: (fixed-buffer 10) => #csp_clj.buffers.fixed.FixedBuffer{...}
See also: csp-clj.buffers.fixed/create
Creates a fixed-size buffer for use with channels.
Parameters:
- capacity: positive integer for buffer size
Returns:
A buffer implementing csp-clj.protocols.buffer/Buffer
Example:
(fixed-buffer 10)
=> #csp_clj.buffers.fixed.FixedBuffer{...}
See also: csp-clj.buffers.fixed/createPuts all items from a collection into a channel.
The putting happens in a background virtual thread. Returns the channel immediately (non-blocking).
Parameters:
Returns: The channel
Example: (into-chan! [1 2 3]) ; creates channel, closes when done (into-chan! ch [1 2 3] false) ; uses existing channel, keeps open
See also: csp-clj.channels/into-chan!
Puts all items from a collection into a channel. The putting happens in a background virtual thread. Returns the channel immediately (non-blocking). Parameters: - coll: collection of values to put - ch: channel (optional, creates buffered channel if omitted) - close?: if true (default), closes channel when done Returns: The channel Example: (into-chan! [1 2 3]) ; creates channel, closes when done (into-chan! ch [1 2 3] false) ; uses existing channel, keeps open See also: csp-clj.channels/into-chan!
Processes values through a transducer with parallelism.
Takes values from 'from' channel, applies transducer xf with parallelism n, and puts results to 'to' channel. Maintains order relative to inputs.
Parameters:
Returns: nil (launches background processing)
Example: (pipeline 4 out-ch (map inc) in-ch) (pipeline 8 out-ch (filter odd?) in-ch {:close? false})
See also: csp-clj.channels/pipeline
Processes values through a transducer with parallelism.
Takes values from 'from' channel, applies transducer xf with
parallelism n, and puts results to 'to' channel. Maintains
order relative to inputs.
Parameters:
- n: parallelism level (number of concurrent operations)
- to: output channel
- xf: transducer to apply
- from: input channel
- opts: optional map with:
- :close? - close 'to' when 'from' closes (default true)
- :executor - :cpu (work-stealing) or :io (virtual threads)
- :ex-handler - function to handle exceptions
Returns:
nil (launches background processing)
Example:
(pipeline 4 out-ch (map inc) in-ch)
(pipeline 8 out-ch (filter odd?) in-ch {:close? false})
See also: csp-clj.channels/pipelineCreates a publisher for topic-based message distribution.
Reads from source channel and routes values to subscribers based on topic-fn. Each topic gets its own internal multiplexer.
Parameters:
Returns: A publisher implementing csp-clj.protocols.publisher/Publisher
Example: (def p (pub! ch :type)) ; topic is :type field (def p (pub! ch :type #(if % 10 1))) ; custom buffer per topic
See also: csp-clj.channels/pub!, sub!, unsub!
Creates a publisher for topic-based message distribution. Reads from source channel and routes values to subscribers based on topic-fn. Each topic gets its own internal multiplexer. Parameters: - source-ch: channel to read from - topic-fn: function to extract topic from value - buf-fn: optional function (topic -> capacity) for buffer sizes Returns: A publisher implementing csp-clj.protocols.publisher/Publisher Example: (def p (pub! ch :type)) ; topic is :type field (def p (pub! ch :type #(if % 10 1))) ; custom buffer per topic See also: csp-clj.channels/pub!, sub!, unsub!
Puts a value onto a channel.
Blocks the current virtual thread until:
Parameters:
Returns:
Example: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1 second => true
See also: csp-clj.channels/put!, take!
Puts a value onto a channel. Blocks the current virtual thread until: - Unbuffered: a consumer takes the value - Buffered: space is available in the buffer Parameters: - ch: the channel - value: the value to put (cannot be nil) - timeout-ms: optional timeout in milliseconds Returns: - true: value was successfully transferred - false: channel is closed - :timeout: timeout elapsed (if timeout-ms provided) Example: (put! ch :value) ; blocks indefinitely (put! ch :value 1000) ; times out after 1 second => true See also: csp-clj.channels/put!, take!
Completes one of several channel operations.
Takes a vector of operations and completes exactly one:
If multiple operations are ready, one is chosen pseudo-randomly for fairness.
Parameters:
Returns vector [channel op value]:
Example: (select! [[ch1 :take] [ch2 :put :value]]) => [ch1 :take :data]
See also: csp-clj.channels/select!
Completes one of several channel operations. Takes a vector of operations and completes exactly one: - [channel :take] - attempt to take a value - [channel :put value] - attempt to put a value If multiple operations are ready, one is chosen pseudo-randomly for fairness. Parameters: - operations: vector of operation vectors - opts: optional map with :timeout key (milliseconds) Returns vector [channel op value]: - [ch :take val] on successful take - [ch :take nil] if taking from closed channel - [ch :put true] on successful put - [ch :put false] if putting to closed channel - [nil :other :timeout] if timeout elapsed - [nil :other :interrupted] if thread interrupted - [nil :other :shutdown] if all channels closed Example: (select! [[ch1 :take] [ch2 :put :value]]) => [ch1 :take :data] See also: csp-clj.channels/select!
Subscribes a channel to a publisher topic.
When the publisher receives a value matching the topic, it will be put onto the subscribed channel.
Parameters:
Returns: The subscribed channel
Example: (sub! p :orders order-ch) (sub! p :events event-ch false) ; don't close with publisher
See also: csp-clj.channels/sub!, pub!, unsub!
Subscribes a channel to a publisher topic. When the publisher receives a value matching the topic, it will be put onto the subscribed channel. Parameters: - p: the publisher - topic: the topic to subscribe to - ch: the channel to receive values - close?: if true (default), closes ch when publisher closes Returns: The subscribed channel Example: (sub! p :orders order-ch) (sub! p :events event-ch false) ; don't close with publisher See also: csp-clj.channels/sub!, pub!, unsub!
Takes a value from a channel.
Blocks the current virtual thread until a value is available. Returns nil when channel is closed and empty (EOF).
Parameters:
Returns:
Example: (take! ch) ; blocks until value available (take! ch 1000) ; times out after 1 second => :value
See also: csp-clj.channels/take!, put!
Takes a value from a channel. Blocks the current virtual thread until a value is available. Returns nil when channel is closed and empty (EOF). Parameters: - ch: the channel - timeout-ms: optional timeout in milliseconds Returns: - value: the taken value - nil: channel is closed and empty, or thread interrupted - :timeout: timeout elapsed (if timeout-ms provided) Example: (take! ch) ; blocks until value available (take! ch 1000) ; times out after 1 second => :value See also: csp-clj.channels/take!, put!
Unsubscribes a channel from a publisher topic.
Parameters:
Returns: nil
Example: (unsub! p :orders order-ch) => nil
See also: csp-clj.channels/unsub!, sub!
Unsubscribes a channel from a publisher topic. Parameters: - p: the publisher - topic: the topic to unsubscribe from - ch: the channel to remove Returns: nil Example: (unsub! p :orders order-ch) => nil See also: csp-clj.channels/unsub!, sub!
Unsubscribes all channels from a publisher.
With one argument, unsubscribes from all topics. With two arguments, unsubscribes from specific topic only.
Parameters:
Returns: nil
Example: (unsub-all! p) ; unsubscribe all from all topics (unsub-all! p :orders) ; unsubscribe all from :orders topic => nil
See also: csp-clj.channels/unsub-all!, sub!
Unsubscribes all channels from a publisher. With one argument, unsubscribes from all topics. With two arguments, unsubscribes from specific topic only. Parameters: - p: the publisher - topic: specific topic (optional) Returns: nil Example: (unsub-all! p) ; unsubscribe all from all topics (unsub-all! p :orders) ; unsubscribe all from :orders topic => nil See also: csp-clj.channels/unsub-all!, sub!
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |