Publish/Subscribe implementation for topic-based message distribution.
Provides a mechanism to route values from a source channel to subscriber channels based on a topic function.
Key Concepts for New Developers:
Algorithm Overview:
Called by: csp-clj.channels/pub!, csp-clj.core/pub!
Publish/Subscribe implementation for topic-based message distribution. Provides a mechanism to route values from a source channel to subscriber channels based on a topic function. Key Concepts for New Developers: - Topic function: Extracts topic from values (e.g., :type field) - Per-topic multiplexers: Each unique topic gets its own mult - Lazy creation: Internal channels created on first subscription - Auto-cleanup: Empty topics are removed automatically Algorithm Overview: 1. Virtual thread continuously takes from source 2. Applies topic-fn to extract topic from each value 3. Routes value to appropriate topic's multiplexer 4. Each topic maintains its own internal channel + mult 5. Subscribers tap into the topic's multiplexer Called by: csp-clj.channels/pub!, csp-clj.core/pub!
(create source-ch topic-fn)(create source-ch topic-fn buf-fn)Creates and returns a pub(lisher) for the given source channel.
A pub runs a background virtual thread that continually reads from the source channel and distributes each value to all channels subscribed to the value's topic.
The topic is determined by calling topic-fn on the value.
TOPIC LIFECYCLE EXAMPLE
;; Create publisher with topic function (def p (create ch :type)) ; route by :type field
;; Step 1: Subscribe (creates topic lazily) (sub! p :orders order-ch) ; :orders topic created (sub! p :orders admin-ch) ; second subscriber to :orders
;; Step 2: Publish (routes to :orders subscribers) (put! ch {:type :orders :data "xyz"}) ; both chs receive
;; Step 3: Unsubscribe (auto-cleanup when last subscriber leaves) (unsub! p :orders order-ch) ; still has admin-ch (unsub! p :orders admin-ch) ; last subscriber, :orders removed
BUFFER FUNCTION
buf-fn is an optional function (topic -> capacity) for per-topic buffering:
(create ch topic-fn) ; no buf-fn, all unbuffered (create ch topic-fn #(if % 10 1)) ; custom buffer per topic (create ch topic-fn (constantly 4)) ; all topics buffered with 4
Edge cases:
THREAD SAFETY
All operations (sub!, unsub!, unsub-all!) are thread-safe. Multiple threads can subscribe/unsubscribe concurrently.
COMPARISON TO MULTIPLEXER
Multiplexer (csp-clj.channels.multiplexer):
Pubsub (this namespace):
Parameters:
Returns: A Publisher implementing csp-clj.protocols.publisher/Publisher
Example: (def p (create ch :type)) ; topic is :type field (def p (create ch :type #(if % 10 1))) ; custom buffer per topic (sub! p :orders order-ch)
See also:
Creates and returns a pub(lisher) for the given source channel.
A pub runs a background virtual thread that continually reads from
the source channel and distributes each value to all channels
subscribed to the value's topic.
The topic is determined by calling topic-fn on the value.
TOPIC LIFECYCLE EXAMPLE
;; Create publisher with topic function
(def p (create ch :type)) ; route by :type field
;; Step 1: Subscribe (creates topic lazily)
(sub! p :orders order-ch) ; :orders topic created
(sub! p :orders admin-ch) ; second subscriber to :orders
;; Step 2: Publish (routes to :orders subscribers)
(put! ch {:type :orders :data "xyz"}) ; both chs receive
;; Step 3: Unsubscribe (auto-cleanup when last subscriber leaves)
(unsub! p :orders order-ch) ; still has admin-ch
(unsub! p :orders admin-ch) ; last subscriber, :orders removed
BUFFER FUNCTION
buf-fn is an optional function (topic -> capacity) for per-topic buffering:
(create ch topic-fn) ; no buf-fn, all unbuffered
(create ch topic-fn #(if % 10 1)) ; custom buffer per topic
(create ch topic-fn (constantly 4)) ; all topics buffered with 4
Edge cases:
- buf-fn returns nil or 0 → unbuffered channel created
- buf-fn returns < 0 or non-number → undefined behavior
- topic-fn returns nil → routed to nil topic (valid but unusual)
THREAD SAFETY
All operations (sub!, unsub!, unsub-all!) are thread-safe.
Multiple threads can subscribe/unsubscribe concurrently.
COMPARISON TO MULTIPLEXER
Multiplexer (csp-clj.channels.multiplexer):
- Broadcast ALL values to ALL taps
- Use case: Log replication, event broadcasting
Pubsub (this namespace):
- Route values BY TOPIC to interested subscribers
- Use case: Order processing by type, chat rooms by channel
Parameters:
- source-ch: the source channel to read from
- topic-fn: function to extract topic from values
- buf-fn: optional function (topic -> capacity) for buffer sizes
Returns:
A Publisher implementing csp-clj.protocols.publisher/Publisher
Example:
(def p (create ch :type)) ; topic is :type field
(def p (create ch :type #(if % 10 1))) ; custom buffer per topic
(sub! p :orders order-ch)
See also:
- csp-clj.channels/pub! for high-level API
- csp-clj.core/pub! for convenience wrapper
- csp-clj.channels.multiplexer for broadcast alternativecljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |