Liking cljdoc? Tell your friends :D
Clojure only.

csp-clj.channels.pubsub

Publish/Subscribe implementation for topic-based message distribution.

Provides a mechanism to route values from a source channel to subscriber channels based on a topic function.

Key Concepts for New Developers:

  • Topic function: Extracts topic from values (e.g., :type field)
  • Per-topic multiplexers: Each unique topic gets its own mult
  • Lazy creation: Internal channels created on first subscription
  • Auto-cleanup: Empty topics are removed automatically

Algorithm Overview:

  1. Virtual thread continuously takes from source
  2. Applies topic-fn to extract topic from each value
  3. Routes value to appropriate topic's multiplexer
  4. Each topic maintains its own internal channel + mult
  5. Subscribers tap into the topic's multiplexer

Called by: csp-clj.channels/pub!, csp-clj.core/pub!

Publish/Subscribe implementation for topic-based message distribution.

Provides a mechanism to route values from a source channel to
subscriber channels based on a topic function.

Key Concepts for New Developers:
- Topic function: Extracts topic from values (e.g., :type field)
- Per-topic multiplexers: Each unique topic gets its own mult
- Lazy creation: Internal channels created on first subscription
- Auto-cleanup: Empty topics are removed automatically

Algorithm Overview:
1. Virtual thread continuously takes from source
2. Applies topic-fn to extract topic from each value
3. Routes value to appropriate topic's multiplexer
4. Each topic maintains its own internal channel + mult
5. Subscribers tap into the topic's multiplexer

Called by: csp-clj.channels/pub!, csp-clj.core/pub!
raw docstring

createclj

(create source-ch topic-fn)
(create source-ch topic-fn buf-fn)

Creates and returns a pub(lisher) for the given source channel.

A pub runs a background virtual thread that continually reads from the source channel and distributes each value to all channels subscribed to the value's topic.

The topic is determined by calling topic-fn on the value.

TOPIC LIFECYCLE EXAMPLE

;; Create publisher with topic function (def p (create ch :type)) ; route by :type field

;; Step 1: Subscribe (creates topic lazily) (sub! p :orders order-ch) ; :orders topic created (sub! p :orders admin-ch) ; second subscriber to :orders

;; Step 2: Publish (routes to :orders subscribers) (put! ch {:type :orders :data "xyz"}) ; both chs receive

;; Step 3: Unsubscribe (auto-cleanup when last subscriber leaves) (unsub! p :orders order-ch) ; still has admin-ch (unsub! p :orders admin-ch) ; last subscriber, :orders removed

BUFFER FUNCTION

buf-fn is an optional function (topic -> capacity) for per-topic buffering:

(create ch topic-fn) ; no buf-fn, all unbuffered (create ch topic-fn #(if % 10 1)) ; custom buffer per topic (create ch topic-fn (constantly 4)) ; all topics buffered with 4

Edge cases:

  • buf-fn returns nil or 0 → unbuffered channel created
  • buf-fn returns < 0 or non-number → undefined behavior
  • topic-fn returns nil → routed to nil topic (valid but unusual)

THREAD SAFETY

All operations (sub!, unsub!, unsub-all!) are thread-safe. Multiple threads can subscribe/unsubscribe concurrently.

COMPARISON TO MULTIPLEXER

Multiplexer (csp-clj.channels.multiplexer):

  • Broadcast ALL values to ALL taps
  • Use case: Log replication, event broadcasting

Pubsub (this namespace):

  • Route values BY TOPIC to interested subscribers
  • Use case: Order processing by type, chat rooms by channel

Parameters:

  • source-ch: the source channel to read from
  • topic-fn: function to extract topic from values
  • buf-fn: optional function (topic -> capacity) for buffer sizes

Returns: A Publisher implementing csp-clj.protocols.publisher/Publisher

Example: (def p (create ch :type)) ; topic is :type field (def p (create ch :type #(if % 10 1))) ; custom buffer per topic (sub! p :orders order-ch)

See also:

  • csp-clj.channels/pub! for high-level API
  • csp-clj.core/pub! for convenience wrapper
  • csp-clj.channels.multiplexer for broadcast alternative
Creates and returns a pub(lisher) for the given source channel.

A pub runs a background virtual thread that continually reads from
the source channel and distributes each value to all channels
subscribed to the value's topic.

The topic is determined by calling topic-fn on the value.

TOPIC LIFECYCLE EXAMPLE

  ;; Create publisher with topic function
  (def p (create ch :type))  ; route by :type field

  ;; Step 1: Subscribe (creates topic lazily)
  (sub! p :orders order-ch)  ; :orders topic created
  (sub! p :orders admin-ch)  ; second subscriber to :orders

  ;; Step 2: Publish (routes to :orders subscribers)
  (put! ch {:type :orders :data "xyz"})  ; both chs receive

  ;; Step 3: Unsubscribe (auto-cleanup when last subscriber leaves)
  (unsub! p :orders order-ch)  ; still has admin-ch
  (unsub! p :orders admin-ch)  ; last subscriber, :orders removed

BUFFER FUNCTION

buf-fn is an optional function (topic -> capacity) for per-topic buffering:

  (create ch topic-fn)                 ; no buf-fn, all unbuffered
  (create ch topic-fn #(if % 10 1))    ; custom buffer per topic
  (create ch topic-fn (constantly 4))  ; all topics buffered with 4

Edge cases:
- buf-fn returns nil or 0 → unbuffered channel created
- buf-fn returns < 0 or non-number → undefined behavior
- topic-fn returns nil → routed to nil topic (valid but unusual)

THREAD SAFETY

All operations (sub!, unsub!, unsub-all!) are thread-safe.
Multiple threads can subscribe/unsubscribe concurrently.

COMPARISON TO MULTIPLEXER

Multiplexer (csp-clj.channels.multiplexer):
- Broadcast ALL values to ALL taps
- Use case: Log replication, event broadcasting

Pubsub (this namespace):
- Route values BY TOPIC to interested subscribers
- Use case: Order processing by type, chat rooms by channel

Parameters:
  - source-ch: the source channel to read from
  - topic-fn: function to extract topic from values
  - buf-fn: optional function (topic -> capacity) for buffer sizes

Returns:
  A Publisher implementing csp-clj.protocols.publisher/Publisher

Example:
  (def p (create ch :type))              ; topic is :type field
  (def p (create ch :type #(if % 10 1))) ; custom buffer per topic
  (sub! p :orders order-ch)

See also:
  - csp-clj.channels/pub! for high-level API
  - csp-clj.core/pub! for convenience wrapper
  - csp-clj.channels.multiplexer for broadcast alternative
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close