Parallel pipeline processing for channels.
Provides parallel transducer application with ordering guarantees. Values flow from input channel through transducer to output channel with configurable parallelism.
Key Concepts for New Developers:
Algorithm Overview:
Called by: csp-clj.channels/pipeline, csp-clj.core/pipeline
Parallel pipeline processing for channels. Provides parallel transducer application with ordering guarantees. Values flow from input channel through transducer to output channel with configurable parallelism. Key Concepts for New Developers: - Parallelism: Multiple values processed concurrently (n workers) - Ordering: Output order matches input order (via CompletableFuture) - Backpressure: Limited by buffer size of jobs channel (size n) - Executor choice: :cpu for computation, :io for I/O-bound work Algorithm Overview: 1. Two virtual threads: ingress (takes) and egress (puts) 2. Jobs channel (buffered size n) limits parallelism 3. Each job wrapped in CompletableFuture for ordering 4. Transducer applied in executor threads 5. Egress thread puts results in order, maintains backpressure Called by: csp-clj.channels/pipeline, csp-clj.core/pipeline
(pipeline n to xf from)(pipeline n
to
xf
from
{:keys [close? executor ex-handler]
:or {close? true executor :cpu ex-handler default-ex-handler}})Takes elements from the from channel and supplies them to the to channel, subject to the transducer xf, with parallelism n.
Because it is parallel, the transducer will be applied independently to each element. Outputs will be returned in order relative to the inputs.
Algorithm:
ORDERING GUARANTEE
Output order matches input order via CompletableFuture. Each input value is wrapped in a future, and the egress thread waits for futures in order. Even if v2 computes faster than v1, v1's result is emitted first.
BACKPRESSURE
The jobs channel (buffered with size n) provides natural backpressure. When full, the ingress thread blocks on put!, stalling the input.
TRANSDUCER INPUT FORMAT
The transducer xf is applied to SINGLE-ELEMENT VECTORS: [v] This matters for stateful transducers (partition-all, dedupe, etc.) that maintain state across inputs.
EXCEPTION HANDLING
Both ingress and egress threads catch Throwable (not just Exception). This ensures Error types (OutOfMemoryError, StackOverflowError) are reported via the :ex-handler and channels are properly cleaned up.
SHARED EXECUTORS WARNING
Executors are shared across pipelines and NEVER shutdown. To stop a pipeline, close its input or output channels.
Options:
Parameters:
Returns: The output channel 'to'
Example: (pipeline 4 out-ch (map inc) in-ch) (pipeline 8 out-ch (filter odd?) in-ch {:close? false}) (pipeline 4 out-ch (map heavy-calc) in-ch {:executor :io})
See also:
Takes elements from the from channel and supplies them to the to
channel, subject to the transducer xf, with parallelism n.
Because it is parallel, the transducer will be applied independently
to each element. Outputs will be returned in order relative to the inputs.
Algorithm:
- Ingress thread: Takes from 'from', submits work as CompletableFuture
- Jobs channel (size n): Limits in-flight work, provides backpressure
- Executor threads: Apply transducer to each value
- Egress thread: Takes futures in order, puts results to 'to'
ORDERING GUARANTEE
Output order matches input order via CompletableFuture. Each input value
is wrapped in a future, and the egress thread waits for futures in order.
Even if v2 computes faster than v1, v1's result is emitted first.
BACKPRESSURE
The jobs channel (buffered with size n) provides natural backpressure.
When full, the ingress thread blocks on put!, stalling the input.
TRANSDUCER INPUT FORMAT
The transducer xf is applied to SINGLE-ELEMENT VECTORS: [v]
This matters for stateful transducers (partition-all, dedupe, etc.)
that maintain state across inputs.
EXCEPTION HANDLING
Both ingress and egress threads catch Throwable (not just Exception).
This ensures Error types (OutOfMemoryError, StackOverflowError) are
reported via the :ex-handler and channels are properly cleaned up.
SHARED EXECUTORS WARNING
Executors are shared across pipelines and NEVER shutdown.
To stop a pipeline, close its input or output channels.
Options:
- :close? (default true) - Close the to channel when from channel closes
- :executor (default :cpu) - :cpu (work stealing) or :io (virtual threads)
- :ex-handler - Function to handle exceptions during transduction
Parameters:
- n: parallelism level (number of concurrent operations)
- to: output channel
- xf: transducer to apply
- from: input channel
- opts: optional configuration map
Returns:
The output channel 'to'
Example:
(pipeline 4 out-ch (map inc) in-ch)
(pipeline 8 out-ch (filter odd?) in-ch {:close? false})
(pipeline 4 out-ch (map heavy-calc) in-ch {:executor :io})
See also:
- csp-clj.channels/pipeline for high-level API
- csp-clj.core/pipeline for convenience wrapper
- java.util.concurrent.CompletableFuture for ordering mechanismcljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |