Liking cljdoc? Tell your friends :D
Clojure only.

csp-clj.channels.pipeline

Parallel pipeline processing for channels.

Provides parallel transducer application with ordering guarantees. Values flow from input channel through transducer to output channel with configurable parallelism.

Key Concepts for New Developers:

  • Parallelism: Multiple values processed concurrently (n workers)
  • Ordering: Output order matches input order (via CompletableFuture)
  • Backpressure: Limited by buffer size of jobs channel (size n)
  • Executor choice: :cpu for computation, :io for I/O-bound work

Algorithm Overview:

  1. Two virtual threads: ingress (takes) and egress (puts)
  2. Jobs channel (buffered size n) limits parallelism
  3. Each job wrapped in CompletableFuture for ordering
  4. Transducer applied in executor threads
  5. Egress thread puts results in order, maintains backpressure

Called by: csp-clj.channels/pipeline, csp-clj.core/pipeline

Parallel pipeline processing for channels.

Provides parallel transducer application with ordering guarantees.
Values flow from input channel through transducer to output channel
with configurable parallelism.

Key Concepts for New Developers:
- Parallelism: Multiple values processed concurrently (n workers)
- Ordering: Output order matches input order (via CompletableFuture)
- Backpressure: Limited by buffer size of jobs channel (size n)
- Executor choice: :cpu for computation, :io for I/O-bound work

Algorithm Overview:
1. Two virtual threads: ingress (takes) and egress (puts)
2. Jobs channel (buffered size n) limits parallelism
3. Each job wrapped in CompletableFuture for ordering
4. Transducer applied in executor threads
5. Egress thread puts results in order, maintains backpressure

Called by: csp-clj.channels/pipeline, csp-clj.core/pipeline
raw docstring

pipelineclj

(pipeline n to xf from)
(pipeline n
          to
          xf
          from
          {:keys [close? executor ex-handler]
           :or {close? true executor :cpu ex-handler default-ex-handler}})

Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n.

Because it is parallel, the transducer will be applied independently to each element. Outputs will be returned in order relative to the inputs.

Algorithm:

  • Ingress thread: Takes from 'from', submits work as CompletableFuture
  • Jobs channel (size n): Limits in-flight work, provides backpressure
  • Executor threads: Apply transducer to each value
  • Egress thread: Takes futures in order, puts results to 'to'

ORDERING GUARANTEE

Output order matches input order via CompletableFuture. Each input value is wrapped in a future, and the egress thread waits for futures in order. Even if v2 computes faster than v1, v1's result is emitted first.

BACKPRESSURE

The jobs channel (buffered with size n) provides natural backpressure. When full, the ingress thread blocks on put!, stalling the input.

TRANSDUCER INPUT FORMAT

The transducer xf is applied to SINGLE-ELEMENT VECTORS: [v] This matters for stateful transducers (partition-all, dedupe, etc.) that maintain state across inputs.

EXCEPTION HANDLING

Both ingress and egress threads catch Throwable (not just Exception). This ensures Error types (OutOfMemoryError, StackOverflowError) are reported via the :ex-handler and channels are properly cleaned up.

SHARED EXECUTORS WARNING

Executors are shared across pipelines and NEVER shutdown. To stop a pipeline, close its input or output channels.

Options:

  • :close? (default true) - Close the to channel when from channel closes
  • :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads)
  • :ex-handler - Function to handle exceptions during transduction

Parameters:

  • n: parallelism level (number of concurrent operations)
  • to: output channel
  • xf: transducer to apply
  • from: input channel
  • opts: optional configuration map

Returns: The output channel 'to'

Example: (pipeline 4 out-ch (map inc) in-ch) (pipeline 8 out-ch (filter odd?) in-ch {:close? false}) (pipeline 4 out-ch (map heavy-calc) in-ch {:executor :io})

See also:

  • csp-clj.channels/pipeline for high-level API
  • csp-clj.core/pipeline for convenience wrapper
  • java.util.concurrent.CompletableFuture for ordering mechanism
Takes elements from the from channel and supplies them to the to
channel, subject to the transducer xf, with parallelism n.

Because it is parallel, the transducer will be applied independently
to each element. Outputs will be returned in order relative to the inputs.

Algorithm:
- Ingress thread: Takes from 'from', submits work as CompletableFuture
- Jobs channel (size n): Limits in-flight work, provides backpressure
- Executor threads: Apply transducer to each value
- Egress thread: Takes futures in order, puts results to 'to'

ORDERING GUARANTEE

Output order matches input order via CompletableFuture. Each input value
is wrapped in a future, and the egress thread waits for futures in order.
Even if v2 computes faster than v1, v1's result is emitted first.

BACKPRESSURE

The jobs channel (buffered with size n) provides natural backpressure.
When full, the ingress thread blocks on put!, stalling the input.

TRANSDUCER INPUT FORMAT

The transducer xf is applied to SINGLE-ELEMENT VECTORS: [v]
This matters for stateful transducers (partition-all, dedupe, etc.)
that maintain state across inputs.

EXCEPTION HANDLING

Both ingress and egress threads catch Throwable (not just Exception).
This ensures Error types (OutOfMemoryError, StackOverflowError) are
reported via the :ex-handler and channels are properly cleaned up.

SHARED EXECUTORS WARNING

Executors are shared across pipelines and NEVER shutdown.
To stop a pipeline, close its input or output channels.

Options:
- :close? (default true) - Close the to channel when from channel closes
- :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads)
- :ex-handler - Function to handle exceptions during transduction

Parameters:
  - n: parallelism level (number of concurrent operations)
  - to: output channel
  - xf: transducer to apply
  - from: input channel
  - opts: optional configuration map

Returns:
  The output channel 'to'

Example:
  (pipeline 4 out-ch (map inc) in-ch)
  (pipeline 8 out-ch (filter odd?) in-ch {:close? false})
  (pipeline 4 out-ch (map heavy-calc) in-ch {:executor :io})

See also:
  - csp-clj.channels/pipeline for high-level API
  - csp-clj.core/pipeline for convenience wrapper
  - java.util.concurrent.CompletableFuture for ordering mechanism
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close