Liking cljdoc? Tell your friends :D

immutant.messaging.pipeline

Provides functions for creating and managing pipelines. A pipeline is a composition of functions ("steps"), where each function is passed the result of the previous function, dereferenced if needed. It is built on top of Immutant's messaging subsystem, allowing each step to have multiple processing threads, and to be automatically load balanced across a cluster.

The pipeline function takes a unique (within the scope of the application) name, one or more single-arity functions, and optional kwarg options, returning a function that places its argument onto the pipeline when called. The resulting pipeline-fn optionally returns a delay that can be used to retrieve the result of the pipeline execution.

Each function can be optionally be wrapped with metadata that provides options for how that particular function is handled (see the 'step' fn below).

Example:

(require '[immutant.pipeline :as pl])

(defn calculate-foo [m]
  ...
  (assoc m :foo value))

(defn save-data [m]
  ...)

;; create a pipeline
(defonce foo-pipeline
  (pl/pipeline "foo" ;; pipelines must be named
    (pl/step calculate-foo :concurrency 5) ;; run this step with 5 threads
    (pl/step #(update-in % [:bar] + (:foo %)) :name :update-bar) ;; give this step a name
    save-data ;; a 'vanilla' step
    :concurrency 2 ;; run all steps with 2 threads (unless overridden)
    :error-handler (fn [ex m] ;; do something special with errors, and retry
                     ...
                     (pl/retry (update-in m [:retry-count] (fnil inc 0))))))

;; put data onto the front pipeline
(foo-pipeline {:bar 1 :ham "biscuit"})

;; put data onto the pipeline at a given step
(foo-pipeline {:bar 1 :foo 42 :ham "gravy"} :step :update-bar)

;; get the result
(deref (foo-pipeline {:bar 1 :ham "biscuit"}) 1000 ::timeout!)

;; optional - it will automatically be stopped on undeploy
(pl/stop foo-pipeline)
Provides functions for creating and managing pipelines. A pipeline
is a composition of functions ("steps"), where each function is
passed the result of the previous function, dereferenced if needed.
It is built on top of Immutant's messaging subsystem, allowing each
step to have multiple processing threads, and to be automatically
load balanced across a cluster.

The [[pipeline]] function takes a unique (within the scope of the
application) name, one or more single-arity functions, and optional
kwarg options, returning a function that places its argument onto
the pipeline when called. The resulting pipeline-fn optionally
returns a delay that can be used to retrieve the result of the
pipeline execution.

Each function can be optionally be wrapped with metadata that
provides options for how that particular function is handled (see
the 'step' fn below).

Example:

```
(require '[immutant.pipeline :as pl])

(defn calculate-foo [m]
  ...
  (assoc m :foo value))

(defn save-data [m]
  ...)

;; create a pipeline
(defonce foo-pipeline
  (pl/pipeline "foo" ;; pipelines must be named
    (pl/step calculate-foo :concurrency 5) ;; run this step with 5 threads
    (pl/step #(update-in % [:bar] + (:foo %)) :name :update-bar) ;; give this step a name
    save-data ;; a 'vanilla' step
    :concurrency 2 ;; run all steps with 2 threads (unless overridden)
    :error-handler (fn [ex m] ;; do something special with errors, and retry
                     ...
                     (pl/retry (update-in m [:retry-count] (fnil inc 0))))))

;; put data onto the front pipeline
(foo-pipeline {:bar 1 :ham "biscuit"})

;; put data onto the pipeline at a given step
(foo-pipeline {:bar 1 :foo 42 :ham "gravy"} :step :update-bar)

;; get the result
(deref (foo-pipeline {:bar 1 :ham "biscuit"}) 1000 ::timeout!)

;; optional - it will automatically be stopped on undeploy
(pl/stop foo-pipeline)
```
raw docstring

*current-step*clj

The name of the current pipeline step. Will be bound within the pipeline steps and error handlers.

The name of the current pipeline step. Will be bound within the
pipeline steps and error handlers.
sourceraw docstring

*next-step*clj

The name of the next pipeline step. Will be bound within the pipeline steps and error handlers.

The name of the next pipeline step. Will be bound within the
pipeline steps and error handlers.
sourceraw docstring

*pipeline*clj

The currently active pipeline fn. Will be bound within the pipeline steps and error handlers.

The currently active pipeline fn. Will be bound within the
pipeline steps and error handlers.
sourceraw docstring

fanoutclj

(fanout xs)

A function that takes a seq and places each item in it on the pipeline at the next step.

This halts pipeline execution for the current message, but continues execution for each seq element. Note that a pipeline that uses this function cannot be correctly derefenced, since the deref will only get the first value to finish the pipeline.

A function that takes a seq and places each item in it on the pipeline at the next step.

This halts pipeline execution for the current message, but
continues execution for each seq element. Note that a pipeline that
uses this function cannot be correctly derefenced, since the deref
will only get the first value to finish the pipeline.
sourceraw docstring

haltclj

Halts pipeline processing for a given message if returned from any handler function.

Halts pipeline processing for a given message if returned from any
handler function.
sourceraw docstring

pipelineclj

(pipeline pl-name & args)

Creates a pipeline function.

It takes a unique (within the scope of the application) name, one or more single-arity functions, and optional kwarg options, and returns a function that places its argument onto the pipeline when called. If :result-ttl is > -1, it returns a delayed value that can be derefed to get the result of the pipeline execution.

The following options are supported, and must follow the step functions as either kwarg arguments or a map [default]:

  • :concurrency the number of threads to use for each step. Can be overridden on a per-step basis - see the 'step' function. [#cores]
  • :error-handler a function that will be called when any step raises an exception. It will be passed the exception and the argument to the step. Without an error-handler, the default HornetQ retry semantics will be used. Can be overridden on a per-step basis - see the 'step' function. [nil]
  • :result-ttl the time-to-live for the final pipeline result, in millis. Set to 0 for "forever", -1 to disable returning the result via a delay [1 hour]
  • :step-deref-timeout the amount of time to wait when dereferencing the result of a step that returns a delay, in millis. Can be overridden on a per-step basis - see the 'step' function. [10 seconds]
  • :durable? whether messages persist across restarts [true]

During the execution of each step and each error-handler call, the following vars are bound:

  • *pipeline* the pipeline (as a fn) that is being executed
  • *current-step* the name of the currently executing step
  • *next-step* the name of the next step in the pipeline

This function is not idempotent. Attempting to create a pipeline with the same name as an existing pipeline will raise an error.

Creates a pipeline function.

It takes a unique (within the scope of the application) name, one
or more single-arity functions, and optional kwarg options, and
returns a function that places its argument onto the pipeline when
called. If :result-ttl is > -1, it returns a delayed value that can
be derefed to get the result of the pipeline execution.

The following options are supported, and must follow the step
functions as either kwarg arguments or a map [default]:

* :concurrency        the number of threads to use for *each* step. Can be
                      overridden on a per-step basis - see the 'step'
                      function. [#cores]
* :error-handler      a function that will be called when any step raises
                      an exception. It will be passed the exception and
                      the argument to the step. Without an error-handler,
                      the default HornetQ retry semantics will be
                      used. Can be overridden on a per-step basis - see
                      the 'step' function. [nil]
* :result-ttl         the time-to-live for the final pipeline result,
                      in millis. Set to 0 for "forever", -1 to disable
                      returning the result via a delay [1 hour]
* :step-deref-timeout the amount of time to wait when dereferencing
                      the result of a step that returns a delay,
                      in millis. Can be overridden on a per-step basis -
                      see the 'step' function. [10 seconds]
* :durable?           whether messages persist across restarts [true]

During the execution of each step and each error-handler call, the
following vars are bound:

* `*pipeline*`      the pipeline (as a fn) that is being executed
* `*current-step*`  the name of the currently executing step
* `*next-step*`     the name of the next step in the pipeline

This function is *not* idempotent. Attempting to create a pipeline
with the same name as an existing pipeline will raise an error.
sourceraw docstring

retryclj

(retry m)
(retry m step)

Retries the message at the current step, or at the given step.

Can only be called from within an :error-handler.

Retries the message at the current step, or at the given step.

Can only be called from within an :error-handler.
sourceraw docstring

stepclj

(step f & opts)

Wraps the given function with the given options, returning a function.

The following options can be passed as kwargs or as a map [default]:

  • :name a name to use for the step [the current index of the fn]
  • :concurrency the number of threads to use, overriding the pipeline setting [#cores]
  • :decode? if false, the raw message object will be passed to this step [true]
  • :error-handler an error handler function that can override the pipeline setting [nil]
  • :fanout? applies the fanout fn to the result of the step. See fanout for more details [false]
  • :step-deref-timeout the amount of time to wait when dereferencing the result of the step if it returns a delay, in ms. Overrides the pipeline setting [10 seconds]
Wraps the given function with the given options, returning a function.

The following options can be passed as kwargs or as a map [default]:

* :name                a name to use for the step [the current index of the fn]
* :concurrency         the number of threads to use, overriding the pipeline
                       setting [#cores]
* :decode?             if false, the raw message object will be passed to
                       this step [true]
* :error-handler       an error handler function that can override the
                       pipeline setting [nil]
* :fanout?             applies the fanout fn to the result of the step.
                       See [[fanout]] for more details [false]
* :step-deref-timeout  the amount of time to wait when dereferencing
                       the result of the step if it returns a delay,
                       in ms. Overrides the pipeline setting [10 seconds]
sourceraw docstring

stopclj

(stop pl)

Destroys a pipeline.

pl can either be the pipeline fn returned by pipeline, or the name of the pipeline.

Typically not necessary since it will be done for you when your app is undeployed.

Destroys a pipeline.

`pl` can either be the pipeline fn returned by [[pipeline]], or the
name of the pipeline.

Typically not necessary since it will be done for you when your app
is undeployed.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close