Liking cljdoc? Tell your friends :D

riemann.streams

The streams namespace aims to provide a comprehensive set of widely applicable, combinable tools for building more complex streams.

Streams are functions which accept events or, in some cases, lists of events.

Streams typically do one or more of the following.

  • Filter events.
  • Transform events.
  • Combine events over time.
  • Apply events to other streams.
  • Forward events to other services.

Most streams accept, after their initial arguments, any number of streams as children. These are known as children or "child streams" of the stream. The children are typically invoked sequentially, any exceptions thrown are caught, logged and optionally forwarded to exception-stream. Return values of children are ignored.

Events are backed by a map (e.g. {:service "foo" :metric 3.5}), so any function that accepts maps will work with events. Common functions like prn can be used as a child stream.

Some common patterns for defining child streams are (fn [e] (println e)) and (partial log :info).

The streams namespace aims to provide a comprehensive set of widely
applicable, combinable tools for building more complex streams.

Streams are functions which accept events or, in some cases, lists of events.

Streams typically do one or more of the following.

* Filter events.
* Transform events.
* Combine events over time.
* Apply events to other streams.
* Forward events to other services.

Most streams accept, after their initial arguments, any number of streams as
children. These are known as children or "child streams" of the stream.
The children are typically invoked sequentially, any exceptions thrown are
caught, logged and optionally forwarded to *exception-stream*.
Return values of children are ignored.

Events are backed by a map (e.g. {:service "foo" :metric 3.5}), so any
function that accepts maps will work with events.
Common functions like prn can be used as a child stream.

Some common patterns for defining child streams are (fn [e] (println e))
and (partial log :info).
raw docstring

*exception-stream*clj

When an exception is caught, it's converted to an event and sent here.

When an exception is caught, it's converted to an event and sent here.
sourceraw docstring

-infinityclj

source

adjustclj

(adjust & args)

Passes on a changed version of each event by applying a function to a particular field or to the event as a whole.

Passing a vector of [field function & args] to adjust will modify the given field in incoming events by applying the function to it along with the given arguments. For example:

(adjust [:service str " rate"] ...)

takes {:service "foo"} and emits {:service "foo rate"}.

If a function is passed to adjust instead of a vector, adjust behaves like smap: the entire event will be given to the function and the result will be passed along to the children. For example:

(adjust #(assoc % :metric (count (:tags %))) ...)

takes {:tags ["foo" "bar"]} and emits {:tags ["foo" "bar"] :metric 2}.

Prefer (smap f & children) to (adjust f & children) where possible.

Passes on a changed version of each event by applying a function to a
particular field or to the event as a whole.

Passing a vector of [field function & args] to adjust will modify the given
field in incoming events by applying the function to it along with the given
arguments.  For example:

(adjust [:service str " rate"] ...)

takes {:service "foo"} and emits {:service "foo rate"}.

If a function is passed to adjust instead of a vector, adjust behaves like
smap: the entire event will be given to the function and the result will be
passed along to the children. For example:

(adjust #(assoc % :metric (count (:tags %))) ...)

takes {:tags ["foo" "bar"]} and emits {:tags ["foo" "bar"] :metric 2}.

Prefer (smap f & children) to (adjust f & children) where possible.
sourceraw docstring

apdexcljmacro

(apdex dt satisfied? tolerated? & children)

A stream which computes Apdex metrics every dt seconds for a stream of events. Satisfied? and tolerated? are predicates as for (where). If satisfied is truthy, increments the satisfied count for that time window by 1. If (tolerated? event) is truthy, increments the tolerated count for that time window by 1. Any other states are ignored. Every dt seconds (as long as events are arriving), emits an event with a metric between 0 and 1, derived by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.

A stream which computes Apdex metrics every dt seconds for a stream of
events. Satisfied? and tolerated? are predicates as for (where). If satisfied
is truthy, increments the satisfied count for that time window by 1. If
(tolerated? event) is truthy, increments the tolerated count for that time
window by 1. Any other states are ignored. Every dt seconds (as long as
events are arriving), emits an event with a metric between 0 and 1, derived
by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.
sourceraw docstring

apdex*clj

(apdex* dt satisfied? tolerated? & children)

Like apdex, but takes functions of events rather than where-predicates.

A stream which computes Apdex metrics every dt seconds for a stream of events. If (satisfied? event) is truthy, increments the satisfied count for that time window by 1. If (tolerated? event) is truthy, increments the tolerated count for that time window by 1. Any other states are ignored. Every dt seconds (as long as events are arriving), emits an event with a metric between 0 and 1, derived by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.

Like apdex, but takes functions of events rather than where-predicates.

A stream which computes Apdex metrics every dt seconds for a stream of
events. If (satisfied? event) is truthy, increments the satisfied count for
that time window by 1. If (tolerated? event) is truthy, increments the
tolerated count for that time window by 1.  Any other states are ignored.
Every dt seconds (as long as events are arriving), emits an event with a
metric between 0 and 1, derived by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.
sourceraw docstring

appendclj

(append reference)

Conj events onto the given reference

Conj events onto the given reference
sourceraw docstring

batchclj

(batch n dt & children)

Batches up events into vectors, bounded both by size and by time. Once either n events have accumulated, or dt seconds passed, flushes the current batch to all child streams. Child streams should accept a sequence of events.

Batches up events into vectors, bounded both by size and by time. Once
either n events have accumulated, or dt seconds passed, flushes the current
batch to all child streams. Child streams should accept a sequence of
events.
sourceraw docstring

bit-bucketclj

(bit-bucket args)

Discards arguments.

Discards arguments.
sourceraw docstring

bycljmacro

(by fields & children)

Splits stream by field. Every time an event arrives with a new value of field, this macro invokes its child forms to return a new, distinct set of streams for that particular value.

(rate 5 prn) prints a single rate for all events, once every five seconds.

(by :host (rate 5) tracks a separate rate for each host, and prints each one every five seconds.

You can pass multiple fields too

(by [:host :service])

Note that field can be a keyword like :host or :state, but you can also use any unary function for more complex sharding.

Be aware that (by) over unbounded values can result in many substreams being created, so you wouldn't want to write (by metric prn): you'd get a separate prn for every unique metric that came in. Also, (by) streams are never garbage-collected.

Splits stream by field.
Every time an event arrives with a new value of field, this macro invokes
its child forms to return a *new*, distinct set of streams for that
particular value.

(rate 5 prn) prints a single rate for all events, once every five seconds.

(by :host (rate 5) tracks a separate rate for *each host*, and prints each
one every five seconds.

You can pass multiple fields too

(by [:host :service])

Note that field can be a keyword like :host or :state, but you can *also* use
any unary function for more complex sharding.

Be aware that (by) over unbounded values can result in
*many* substreams being created, so you wouldn't want to write
(by metric prn): you'd get a separate prn for *every* unique metric that
came in.  Also, (by) streams are never garbage-collected.
sourceraw docstring

by-buildercljmacro

(by-builder [sym fields] & forms)

Splits stream by provided function. This is a variation of by where forms are executed when a fork is created to yield the children.

This allows you to perform operations based on the fork-name, i.e: the output of the given fields.

(by-builder [host :host] (forward (get relay-by-host host)))

Splits stream by provided function.
This is a variation of `by` where forms are executed when a fork
is created to yield the children.

This allows you to perform operations based on the fork-name, i.e:
the output of the given fields.

(by-builder [host :host] (forward (get relay-by-host host)))
sourceraw docstring

by-fnclj

(by-fn fields new-fork)
source

call-rescuecljmacro

(call-rescue event children)

Call each child stream with event, in order. Rescues and logs any failure.

Call each child stream with event, in order. Rescues and logs any failure.
sourceraw docstring

changedclj

(changed pred & children)

Passes on events only when (f event) differs from that of the previous event. Options:

  • :init The initial value to assume for (pred event).
; Print all state changes
(changed :state prn)

; Assume states *were* ok the first time we see them.
(changed :state {:init "ok"} prn)

; Receive the previous event, in addition to the current event, as a vector.
(changed :state {:pairs? true}
         (fn [[event event']]
           (prn "changed from" (:state event) "to" (:state event'))))

; Note that f can be an arbitrary function:

(changed (fn [e] (> (:metric e) 2)) ...)
Passes on events only when (f event) differs from that of the previous
event. Options:

- :init   The initial value to assume for (pred event).

```clojure
; Print all state changes
(changed :state prn)

; Assume states *were* ok the first time we see them.
(changed :state {:init "ok"} prn)

; Receive the previous event, in addition to the current event, as a vector.
(changed :state {:pairs? true}
         (fn [[event event']]
           (prn "changed from" (:state event) "to" (:state event'))))

; Note that f can be an arbitrary function:

(changed (fn [e] (> (:metric e) 2)) ...)
```
sourceraw docstring

changed-statecljmacro

(changed-state & children)

Passes on changes in state for each distinct host and service.

Passes on changes in state for each distinct host and service.
sourceraw docstring

clock-skewclj

(clock-skew & children)

Detects clock skew between hosts. Keeps track of what time each host thinks it is, based on their most recent event time. Compares the time of each event to the median clock, and passes on that event with metric equal to the time difference: events ahead of the clock have positive metrics, and events behind the clock have negative metrics.

Detects clock skew between hosts. Keeps track of what time each host thinks
it is, based on their most recent event time. Compares the time of each event
to the median clock, and passes on that event with metric equal to the time
difference: events ahead of the clock have positive metrics, and events
behind the clock have negative metrics.
sourceraw docstring

coalesceclj

(coalesce & [dt & children])

Combines events over time. Coalesce remembers the most recent event for each service/host combination that passes through it (limited by :ttl). Every dt seconds (default to 1 second), it passes on all events it remembers. When events expire, they are included in the emitted sequence of events once, and removed from the state table thereafter.

Use coalesce to combine states that arrive at different times--for instance, to average the CPU use over several hosts.

Every 10 seconds, print a sequence of events including all the events which share the same :foo and :bar attributes:

(by [:foo :bar]
  (coalesce 10 prn))
Combines events over time. Coalesce remembers the most recent event for each
service/host combination that passes through it (limited by :ttl). Every dt
seconds (default to 1 second), it passes on *all* events it remembers. When
events expire, they are included in the emitted sequence of events *once*,
and removed from the state table thereafter.

Use coalesce to combine states that arrive at different times--for instance,
to average the CPU use over several hosts.

Every 10 seconds, print a sequence of events including all the events which
share the same :foo and :bar attributes:

```clojure
(by [:foo :bar]
  (coalesce 10 prn))
```
sourceraw docstring

coalesce-with-eventclj

(coalesce-with-event keyfn child)

Helper for coalesce: calls (f current-event all-events) every time an event is received.

Helper for coalesce: calls (f current-event all-events) every time an event
is received.
sourceraw docstring

counterclj

(counter & children)

Counts things. The first argument may be an initial counter value, which defaults to zero.

; Starts at zero
(counter index)

; Starts at 500
(counter 500 index)

Events without metrics are passed through unchanged. Events with metrics increment the counter, and are passed on with their metric set to the current count.

You can reset the counter by passing it an event with a metric, tagged "reset"; the count will be reset to that metric.

Counts things. The first argument may be an initial counter value, which
defaults to zero.

```clojure
; Starts at zero
(counter index)

; Starts at 500
(counter 500 index)
```

Events without metrics are passed through unchanged. Events with metrics
increment the counter, and are passed on with their metric set to the current
count.

You can reset the counter by passing it an event with a metric, tagged
"reset"; the count will be reset to that metric.
sourceraw docstring

ddtclj

(ddt & args)

Differentiate metrics with respect to time. Takes an optional number followed by child streams. If the first argument is a number n, emits a rate-of-change event every n seconds, until expired. If the first argument is not number, emits an event for each event received, but with metric equal to the difference between the current event and the previous one, divided by the difference in their times. Skips events without metrics.

(ddt 5 graph index)
(ddt graph index)
Differentiate metrics with respect to time. Takes an optional number
followed by child streams. If the first argument is a number n, emits a
rate-of-change event every n seconds, until expired. If the first argument is
not number, emits an event for each event received, but with metric equal to
the difference between the current event and the previous one, divided by the
difference in their times. Skips events without metrics.

```clojure
(ddt 5 graph index)
(ddt graph index)
```
sourceraw docstring

ddt-eventsclj

(ddt-events & children)

(ddt) between each pair of events.

(ddt) between each pair of events.
sourceraw docstring

ddt-realclj

(ddt-real n & children)

(ddt) in real time.

(ddt) in real time.
sourceraw docstring

defaultclj

(default & args)

Like with, but does not override existing (i.e. non-nil) values. Useful when you want to fill in default values for events that might come in without them.

(default :ttl 300 index)
(default {:service "jrecursive" :state "chicken"} index)
Like `with`, but does not override existing (i.e. non-nil) values. Useful
when you want to fill in default values for events that might come in without
them.

```clojure
(default :ttl 300 index)
(default {:service "jrecursive" :state "chicken"} index)
```
sourceraw docstring

dualclj

(dual pred true-stream false-stream)

A stream which splits events into two mirror-images streams, based on (pred e).

If (pred e) is true, calls (true-stream e) and (false-stream (expire e)).

If (pred e) is false, does the opposite. Expired events are forwarded to both streams.

(pred e) is always called once per incoming event.

A stream which splits events into two mirror-images streams, based on
(pred e).

If (pred e) is true, calls (true-stream e) and (false-stream (expire e)).

If (pred e) is false, does the opposite. Expired events are forwarded to both
streams.

(pred e) is always called once per incoming event.
sourceraw docstring

ewmaclj

(ewma halflife & children)

Exponential weighted moving average. Constant space and time overhead. Passes on each event received, but with metric adjusted to the moving average. Takes into account the time between events.

Exponential weighted moving average. Constant space and time overhead.
Passes on each event received, but with metric adjusted to the moving
average. Takes into account the time between events.
sourceraw docstring

ewma-timelessclj

(ewma-timeless r & children)

Exponential weighted moving average. Constant space and time overhead. Passes on each event received, but with metric adjusted to the moving average. Does not take the time between events into account. R is the ratio between successive events: r=1 means always return the most recent metric; r=1/2 means the current event counts for half, the previous event for 1/4, the previous event for 1/8, and so on.

Exponential weighted moving average. Constant space and time overhead.
Passes on each event received, but with metric adjusted to the moving
average. Does not take the time between events into account. R is the ratio
between successive events: r=1 means always return the most recent metric;
r=1/2 means the current event counts for half, the previous event for 1/4,
the previous event for 1/8, and so on.
sourceraw docstring

exception-streamcljmacro

(exception-stream exception-stream & children)

Catches exceptions, converts them to events, and sends those events to a special exception stream.

(exception-stream (email "polito@vonbraun.com")
  (async-queue! :graphite {:core-pool-size 128}
    graph))

Streams often take multiple children and send an event to each using call-rescue. Call-rescue will rescue any exception thrown by a child stream, log it, and move on to the next child stream, so that a failure in one child won't prevent others from executing.

Exceptions binds a dynamically scoped thread-local variable exception-stream. When call-rescue encounters an exception, it will also route the error to this exception stream. When switching threads (e.g. when using an executor or Thread), you must use (bound-fn) to preserve this binding.

This is a little more complex than you might think, because we not only need to bind this variable during the runtime execution of child streams, but also during the evaluation of the child streams themselves, e.g. at the invocation time of exceptions itself. If we write

(exception-stream (email ...)
  (rate 5 index))

then (rate), when invoked, might need access to this variable immediately. Therefore, this macro binds exception-stream twice: one when evaluating children, and again, every time the returned stream is invoked.

Catches exceptions, converts them to events, and sends those events to a
special exception stream.

```clojure
(exception-stream (email "polito@vonbraun.com")
  (async-queue! :graphite {:core-pool-size 128}
    graph))
```

Streams often take multiple children and send an event to each using
call-rescue. Call-rescue will rescue any exception thrown by a child stream,
log it, and move on to the next child stream, so that a failure in one child
won't prevent others from executing.

Exceptions binds a dynamically scoped thread-local variable
*exception-stream*. When call-rescue encounters an exception, it will *also*
route the error to this exception stream. When switching threads (e.g. when
using an executor or Thread), you
must use (bound-fn) to preserve this binding.

This is a little more complex than you might think, because we *not only*
need to bind this variable during the runtime execution of child streams, but
*also* during the evaluation of the child streams themselves, e.g. at the
invocation time of exceptions itself. If we write

```clojure
(exception-stream (email ...)
  (rate 5 index))
```

then (rate), when invoked, might need access to this variable immediately.
Therefore, this macro binds *exception-stream* twice: one when evaluating
children, and again, every time the returned stream is invoked.
sourceraw docstring

execute-onclj

(execute-on executor & children)

Returns a stream which accepts events and executes them using a java.util.concurrent.Executor. Returns immediately. May throw RejectedExecutionException if the underlying executor will not accept the event; e.g. if its queue is full. Use together with riemann.service/executor-service for reloadable asynchronous execution of streams. See also: async-queue!, which may be simpler.

(let [io-pool (service!
                (executor-service
                  #(ThreadPoolExecutor. 1 10 ...)))
      graph (execute-on io-pool (graphite {:host ...}))]
  ...
  (tagged "graph"
    graph))
Returns a stream which accepts events and executes them using a
java.util.concurrent.Executor. Returns immediately. May throw
RejectedExecutionException if the underlying executor will not accept the
event; e.g. if its queue is full. Use together with
riemann.service/executor-service for reloadable asynchronous execution of
streams. See also: async-queue!, which may be simpler.

```clojure
(let [io-pool (service!
                (executor-service
                  #(ThreadPoolExecutor. 1 10 ...)))
      graph (execute-on io-pool (graphite {:host ...}))]
  ...
  (tagged "graph"
    graph))
```
sourceraw docstring

expiredclj

(expired & children)

Passes on expired events.

Passes on expired events.
sourceraw docstring

expired?clj

(expired? event)

There are two ways an event can be considered expired. First, if it has state "expired". Second, if its :ttl and :time indicates it has expired.

There are two ways an event can be considered expired.
First, if it has state "expired".
Second, if its :ttl and :time indicates it has expired.
sourceraw docstring

fill-inclj

(fill-in interval default-event & children)

Passes on all events. Fills in gaps in event stream with copies of the given event, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

Passes on all events. Fills in gaps in event stream with copies of the given
event, wherever interval seconds pass without an event arriving. Inserted
events have current time. Stops inserting when expired. Uses local times.
sourceraw docstring

fill-in-lastclj

(fill-in-last interval update & children)

Passes on all events. Fills in gaps in event stream with copies of the last event merged with the given data, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

Passes on all events. Fills in gaps in event stream with copies of the last
event merged with the given data, wherever interval seconds pass without an
event arriving. Inserted events have current time. Stops inserting when
expired. Uses local times.
sourceraw docstring

fill-in-last*clj

(fill-in-last* interval updater & children)

Passes on all events. Fills in gaps in event stream with copies of the last event updated with the given updater function, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

Passes on all events. Fills in gaps in event stream with copies of
the last event updated with the given updater function, wherever
interval seconds pass without an event arriving. Inserted events
have current time. Stops inserting when expired. Uses local times.
sourceraw docstring

fixed-event-windowclj

(fixed-event-window n & children)

Passes on fixed-size windows of n events each. Accumulates n events, then calls children with a vector of those events, from oldest to newest. Ignores event times. Example:

(fixed-event-window 5 (smap folds/mean index))

Passes on fixed-size windows of n events each. Accumulates n events, then
calls children with a vector of those events, from oldest to newest. Ignores
event times. Example:

(fixed-event-window 5 (smap folds/mean index))
sourceraw docstring

fixed-offset-time-windowclj

(fixed-offset-time-window n & children)

Like fixed-time-window, but divides wall clock time into discrete windows.

A fixed window over the event stream in time. Emits vectors of events, such that each vector has events from a distinct n-second interval. Windows do not overlap; each event appears at most once in the output stream. Once an event is emitted, all events older or equal to that emitted event are silently dropped.

Events without times accrue in the current window.

Like fixed-time-window, but divides wall clock time into discrete windows.

A fixed window over the event stream in time. Emits vectors of events, such
that each vector has events from a distinct n-second interval. Windows do
*not* overlap; each event appears at most once in the output stream. Once an
event is emitted, all events *older or equal* to that emitted event are
silently dropped.

Events without times accrue in the current window.
sourceraw docstring

fixed-time-windowclj

(fixed-time-window n & children)

A fixed window over the event stream in time. Emits vectors of events, such that each vector has events from a distinct n-second interval. Windows do not overlap; each event appears at most once in the output stream. Once an event is emitted, all events older or equal to that emitted event are silently dropped.

Events without times accrue in the current window.

A fixed window over the event stream in time. Emits vectors of events, such
that each vector has events from a distinct n-second interval. Windows do
*not* overlap; each event appears at most once in the output stream. Once an
event is emitted, all events *older or equal* to that emitted event are
silently dropped.

Events without times accrue in the current window.
sourceraw docstring

fold-intervalclj

(fold-interval interval event-key folder & children)

Applies the folder function to all event-key values of events during interval seconds.

Applies the folder function to all event-key values of events during
interval seconds.
sourceraw docstring

fold-interval-metricclj

(fold-interval-metric interval folder & children)

Wrapping for fold-interval that assumes :metric as event-key.

Wrapping for fold-interval that assumes :metric as event-key.
sourceraw docstring

forwardclj

(forward client)

Sends an event or a collection of events through a Riemann client.

Sends an event or a collection of events through a Riemann client.
sourceraw docstring

infinityclj

source

interpolate-constantclj

(interpolate-constant interval & children)

Emits a constant stream of events every interval seconds, starting when an event is received, and ending when an expired event is received. Times are set to Riemann's time. The first and last events are forwarded immediately.

Note: ignores event times currently--will change later.

Emits a constant stream of events every interval seconds, starting when an
event is received, and ending when an expired event is received. Times are
set to Riemann's time. The first and last events are forwarded immediately.

Note: ignores event times currently--will change later.
sourceraw docstring

matchclj

(match f value & children)

Passes events on to children only when (f event) matches value, using riemann.common/match. For instance:

(match :service nil prn)
(match :state #{"warning" "critical"} prn)
(match :description #"error" prn)
(match :metric 5 prn)
(match expired? true prn)
(match (fn [e] (/ (:metric e) 1000)) 5 prn)

For cases where you only care about whether (f event) is truthy, use (where some-fn) instead of (match some-fn true).

Passes events on to children only when (f event) matches value, using
riemann.common/match. For instance:

```clojure
(match :service nil prn)
(match :state #{"warning" "critical"} prn)
(match :description #"error" prn)
(match :metric 5 prn)
(match expired? true prn)
(match (fn [e] (/ (:metric e) 1000)) 5 prn)
```

For cases where you only care about whether (f event) is truthy, use (where
some-fn) instead of (match some-fn true).
sourceraw docstring

mean-over-timeclj

(mean-over-time children)

Emits the most recent event each time this stream is called, but with the average of all received metrics.

Emits the most recent event each time this stream is called, but with the
average of all received metrics.
sourceraw docstring

moving-event-windowclj

(moving-event-window n & children)

A sliding window of the last few events. Every time an event arrives, calls children with a vector of the last n events, from oldest to newest. Ignores event times. Example:

(moving-event-window 5 (smap folds/mean index))

A sliding window of the last few events. Every time an event arrives, calls
children with a vector of the last n events, from oldest to newest. Ignores
event times. Example:

(moving-event-window 5 (smap folds/mean index))
sourceraw docstring

moving-time-windowclj

(moving-time-window n & children)

A sliding window of all events with times within the last n seconds. Uses the maximum event time as the present-time horizon. Every time a new event arrives within the window, emits a vector of events in the window to children.

Events without times accrue in the current window.

A sliding window of all events with times within the last n seconds. Uses
the maximum event time as the present-time horizon. Every time a new event
arrives within the window, emits a vector of events in the window to
children.

Events without times accrue in the current window.
sourceraw docstring

not-expiredclj

(not-expired & children)

Passes on not expired events.

Passes on not expired events.
sourceraw docstring

overclj

(over x & children)

Passes on events only when their metric is greater than x

Passes on events only when their metric is greater than x
sourceraw docstring

part-time-fastclj

(part-time-fast interval create add finish)

Partitions events by time (fast variant). Each <interval> seconds, creates a new bin by calling (create). Applies each received event to the current bin with (add bin event). When the time interval is over, calls (finish bin start-time elapsed-time).

Concurrency guarantees:

  • (create) may be called multiple times for a given time slice.
  • (add) when called, will receive exactly one distinct bucket in each time slice.
  • (finish) will be called exactly once for each time slice.
Partitions events by time (fast variant). Each <interval> seconds, creates a
new bin by calling (create). Applies each received event to the current bin
with (add bin event). When the time interval is over, calls (finish bin
start-time elapsed-time).

Concurrency guarantees:

- (create) may be called multiple times for a given time slice.
- (add)    when called, will receive exactly one distinct bucket in each time
           slice.
- (finish) will be called *exactly once* for each time slice.
sourceraw docstring

part-time-simpleclj

(part-time-simple dt reset add finish)
(part-time-simple dt reset add side-effects finish)

Divides wall clock time into discrete windows. Returns a stream, composed of four functions:

(reset previous-state) Given the state for the previous window, returns a fresh state for a new window. Reset must be a pure function, as it will be invoked in a compare-and-set loop. Reset may be invoked at any time. Reset will be invoked with nil when no previous state exists.

(add state event) is called every time an event arrives to combine the event and the state together, returning some new state. Merge must be a pure function.

(side-effects state event) is called with the resulting state and the event which just arrived, but will be called only once per event, and can be impure. Its return value is used for the return value of the stream.

(finish state start-time end-time) is called once at the end of each time window, and receives the final state for that window, and also the start and end times for that window. Finish will be called exactly once per window, and may be impure.

When no events arrive in a given time window, no functions are called.

Divides wall clock time into discrete windows. Returns a stream, composed
of four functions:

(reset previous-state) Given the state for the previous window, returns a
fresh state for a new window. Reset must be a pure function, as it will be
invoked in a compare-and-set loop. Reset may be invoked at *any* time. Reset
will be invoked with nil when no previous state exists.

(add state event) is called every time an event arrives to *combine* the
event and the state together, returning some new state. Merge must be a pure
function.

(side-effects state event) is called with the *resulting* state and the event
which just arrived, but will be called only once per event, and can be
impure. Its return value is used for the return value of the stream.

(finish state start-time end-time) is called once at the end of each time
window, and receives the final state for that window, and also the start
and end times for that window. Finish will be called exactly once per window,
and may be impure.

When no events arrive in a given time window, no functions are called.
sourceraw docstring

percentilesclj

(percentiles interval points & children)

Over each period of interval seconds, aggregates events and selects one event from that period for each point. If point is 0, takes the lowest metric event. If point is 1, takes the highest metric event. 0.5 is the median event, and so forth. Forwards each of these events to children. The service name has the point appended to it; e.g. 'response time' becomes 'response time 0.95'.

Over each period of interval seconds, aggregates events and selects one
event from that period for each point. If point is 0, takes the lowest metric
event.  If point is 1, takes the highest metric event. 0.5 is the median
event, and so forth. Forwards each of these events to children. The service
name has the point appended to it; e.g. 'response time' becomes 'response
time 0.95'.
sourceraw docstring

periodically-until-expiredclj

(periodically-until-expired f)
(periodically-until-expired interval f)
(periodically-until-expired interval delay f)

When an event arrives, begins calling f every interval seconds. Starts after delay. Stops calling f when an expired? event arrives, or the most recent event expires.

When an event arrives, begins calling f every interval seconds. Starts after
delay. Stops calling f when an expired? event arrives, or the most recent
event expires.
sourceraw docstring

pipecljmacro

(pipe marker & stages)

Sometimes, you want to have a stream split into several paths, then recombine those paths after some transformation. Pipe lets you write these topologies easily.

We might express a linear stream in Riemann, in which a -> b -> c -> d, as

(a (b (c d)))

With pipe, we write

(pipe ↧ (a ↧)
        (b ↧)
        (c ↧)
        d)

The first argument ↧ is a marker for points where events should flow down into the next stage. A delightful list of marker symbols you might enjoy is available at http://www.alanwood.net/unicode/arrows.html.

What makes pipe more powerful than the standard Riemann composition rules is that the marker may appear multiple times in a stage, and at any depth in the expression. For instance, we might want to categorize events based on their metric, and send all those events into the same throttled email stream.

(let [throttled-emailer (throttle 100 1 (email "ops@rickenbacker.mil"))]
  (splitp < metric
    0.9 (with :state :critical throttled-emailer)
    0.5 (with :state :warning  throttled-emailer)
        (with :state :ok       throttled-emailer)))

But with pipe, we can write:

(pipe - (splitp < metric
                0.9 (with :state :critical -)
                0.5 (with :state :warning  -)
                    (with :state :ok       -))
        (throttle 100 1 (email "ops@rickenbacker.mil")))

So pipe lets us do three things:

  1. Flatten a deeply nested expression, like Clojure's -> and ->>.

  2. Omit or simplify the names for each stage, when we care more about the structure of the streams than giving them full descriptions.

  3. Write the stream in the order in which events flow.

Pipe rewrites its stages as a let binding in reverse order; binding each stage to the placeholder in turn. The placeholder must be a compile-time symbol, and obeys the usual let-binding rules about variable shadowing; you can rebind the marker lexically within any stage using let, etc. Yep, this is a swiss arrow in disguise; ssshhhhhhh. ;-)

Sometimes, you want to have a stream split into several paths, then
recombine those paths after some transformation. Pipe lets you write
these topologies easily.

We might express a linear stream in Riemann, in which a -> b -> c -> d, as

(a (b (c d)))

With pipe, we write

```clojure
(pipe ↧ (a ↧)
        (b ↧)
        (c ↧)
        d)
```

The first argument ↧ is a *marker* for points where events should flow down
into the next stage. A delightful list of marker symbols you might enjoy is
available at http://www.alanwood.net/unicode/arrows.html.

What makes pipe more powerful than the standard Riemann composition rules is
that the marker may appear *multiple times* in a stage, and *at any depth in
the expression*. For instance, we might want to categorize events based on
their metric, and send all those events into the same throttled email stream.

```clojure
(let [throttled-emailer (throttle 100 1 (email "ops@rickenbacker.mil"))]
  (splitp < metric
    0.9 (with :state :critical throttled-emailer)
    0.5 (with :state :warning  throttled-emailer)
        (with :state :ok       throttled-emailer)))
```

But with pipe, we can write:

```clojure
(pipe - (splitp < metric
                0.9 (with :state :critical -)
                0.5 (with :state :warning  -)
                    (with :state :ok       -))
        (throttle 100 1 (email "ops@rickenbacker.mil")))
```

So pipe lets us do three things:

0. *Flatten* a deeply nested expression, like Clojure's -> and ->>.

1. *Omit or simplify* the names for each stage, when we care more about the
*structure* of the streams than giving them full descriptions.

2. Write the stream in the *order in which events flow*.

Pipe rewrites its stages as a let binding in reverse order; binding each
stage to the placeholder in turn. The placeholder must be a compile-time
symbol, and obeys the usual let-binding rules about variable shadowing; you
can rebind the marker lexically within any stage using let, etc. Yep, this is
a swiss arrow in disguise; ssshhhhhhh. ;-)
sourceraw docstring

predict-linearclj

(predict-linear n s r & children)

Stream that performs OLS regression. Uses a moving-event-window of n events and emits an event with a prediction for :metric of s seconds in the future. If the optional model rebuild interval r (in seconds) is specified the model will be rebuild periodically and not on every arriving event.

E.g. predict the metric of service "fs-usage" 30 minutes in the future grouped by host:

(where (service "fs-usage")
  (by :host
    (predict-linear 100 1800
      #(info %))))
Stream that performs OLS regression. Uses a moving-event-window
of n events and emits an event with a prediction for :metric of s
seconds in the future. If the optional model rebuild interval r
(in seconds) is specified the model will be rebuild periodically and
not on every arriving event.

E.g. predict the metric of service "fs-usage" 30 minutes in the
future grouped by host:

```clojure
(where (service "fs-usage")
  (by :host
    (predict-linear 100 1800
      #(info %))))
```
sourceraw docstring

projectcljmacro

(project basis & children)

Projects an event stream into a specific basis--like (coalesce), but where you only want to compare two or three specific events. Takes a vector of predicate expressions, like those used in (where). Project maintains a vector of the most recent event for each predicate. An incoming event is compared against each predicate; if it matches, the event replaces any previous event in that position and the entire vector of events is forwarded to all child streams. Expired events are included in the emitted vector of events once, and removed from the state vector thereafter.

Use project when you want to compare a small number of distinct states over time. For instance, to find the ratio of enqueues to dequeues:

(project [(service "enqueues")
          (service "dequeues")]
  (smap folds/quotient
    (with :service "enqueues per dequeue"
      ...)))

Here we've combined separate events--enqueues and dequeues--into a single event, using the folds/quotient function, which divides the first event's metric by the second. Then we assigned a new service name to that resulting event, and could subsequently filter based on the metric, assign different states, graph, alert, etc.

Projects an event stream into a specific basis--like (coalesce), but where
you only want to compare two or three specific events. Takes a vector of
predicate expressions, like those used in (where). Project maintains a vector
of the most recent event for each predicate. An incoming event is compared
against each predicate; if it matches, the event replaces any previous event
in that position and the entire vector of events is forwarded to all child
streams. Expired events are included in the emitted vector of events *once*,
and removed from the state vector thereafter.

Use project when you want to compare a small number of distinct states over
time. For instance, to find the ratio of enqueues to dequeues:

```clojure
(project [(service "enqueues")
          (service "dequeues")]
  (smap folds/quotient
    (with :service "enqueues per dequeue"
      ...)))
```

Here we've combined separate events--enqueues and dequeues--into a single
event, using the folds/quotient function, which divides the first event's
metric by the second. Then we assigned a new service name to that resulting
event, and could subsequently filter based on the metric, assign different
states, graph, alert, etc.
sourceraw docstring

project*clj

(project* predicates & children)

Like project, but takes predicate functions instead of where expressions.

Like project, but takes predicate *functions* instead of where expressions.
sourceraw docstring

rateclj

(rate interval & children)

Take the sum of every event's metric over interval seconds and divide by the interval size. Emits one event every interval seconds. Starts as soon as an event is received, stops when the most recent event expires. Uses the most recently received event with a metric as a template. Event ttls decrease constantly if no new events arrive.

Take the sum of every event's metric over interval seconds and divide by the
interval size. Emits one event every interval seconds. Starts as soon as an
event is received, stops when the most recent event expires. Uses the most
recently received event with a metric as a template. Event ttls decrease
constantly if no new events arrive.
sourceraw docstring

registerclj

(register reference)

Set reference to the most recent event that passes through.

Set reference to the most recent event that passes through.
sourceraw docstring

rollupclj

(rollup n dt & children)

Invokes children with events at most n times per dt second interval. Passes vectors of events to children, not a single event at a time. For instance, (rollup 3 1 f) receives five events and forwards three times per second:

  • 1 -> (f [1])
  • 2 -> (f [2])
  • 3 -> (f [3])
  • 4 ->
  • 5 ->

... and events 4 and 5 are rolled over into the next period:

-> (f [4 5])

Invokes children with events at most n times per dt second interval. Passes
*vectors* of events to children, not a single event at a time. For instance,
(rollup 3 1 f) receives five events and forwards three times per second:

- 1 -> (f [1])
- 2 -> (f [2])
- 3 -> (f [3])
- 4 ->
- 5 ->

... and events 4 and 5 are rolled over into the next period:

  -> (f [4 5])
sourceraw docstring

runsclj

(runs len-run field & children)

Usable to perform flap detection, runs examines a moving-event-window of n events and determines if :field is the same across all them. If it is, runs passes on the last (newest) event of the window. In practice, this can be used with (changed-state ...) as a child to reduce 'flappiness' for state changes.

(runs 3 :state prn) ; Print events where there are 3-in-a-row of a state.

Usable to perform flap detection, runs examines a moving-event-window of
n events and determines if :field is the same across all them. If it is,
runs passes on the last (newest) event of the window. In practice, this can
be used with (changed-state ...) as a child to reduce 'flappiness' for state
changes.

(runs 3 :state prn) ; Print events where there are 3-in-a-row of a state.
sourceraw docstring

scaleclj

(scale factor & children)

Passes on a changed version of each event by multiplying each metric with the given scale factor.

; Convert bytes to kilobytes

(scale 1/1024 index)

Passes on a changed version of each event by multiplying each
 metric with the given scale factor.

; Convert bytes to kilobytes

(scale 1/1024 index)
sourceraw docstring

sdoclj

(sdo)
(sdo child)
(sdo child & children)

Takes a list of functions f1, f2, f3, and returns f such that (f event) calls (f1 event) (f2 event) (f3 event). Useful for binding several streams to a single variable.

(sdo prn (rate 5 index))

Takes a list of functions f1, f2, f3, and returns f such that (f event)
calls (f1 event) (f2 event) (f3 event). Useful for binding several streams to
a single variable.

(sdo prn (rate 5 index))
sourceraw docstring

sflattenclj

(sflatten & children)

Streaming flatten. Calls children with each event in events. Events should be a sequence.

Streaming flatten. Calls children with each event in events. Events should be a sequence.
sourceraw docstring

smapclj

(smap f & children)

Streaming map. Calls children with (f event), whenever (f event) is non-nil. Prefer this to (adjust f) and (combine f). Example:

(smap :metric prn) ; prints the metric of each event.
(smap #(assoc % :state "ok") index) ; Indexes each event with state "ok"
Streaming map. Calls children with (f event), whenever (f event) is non-nil.
Prefer this to (adjust f) and (combine f). Example:

```clojure
(smap :metric prn) ; prints the metric of each event.
(smap #(assoc % :state "ok") index) ; Indexes each event with state "ok"
```
sourceraw docstring

smap*clj

(smap* f & children)

Streaming map: less magic. Calls children with (f event). Unlike smap, passes on nil results to children. Example:

(smap folds/maximum prn) ; Prints the maximum of lists of events.

Streaming map: less magic. Calls children with (f event).
Unlike smap, passes on nil results to children. Example:

(smap folds/maximum prn) ; Prints the maximum of lists of events.
sourceraw docstring

smapcatclj

(smapcat f & children)

Streaming mapcat. Calls children with each event in (f event), which should return a sequence. For instance, to set the state of any services with metrics deviating from the mode to "warning", one might use coalesce to aggregate all services, and smapcat to find the mode and assoc the proper states; emitting a series of individual events to the index.

(coalesce
  (smapcat (fn [events]
             (let [freqs (frequencies (map :metric events))
                   mode  (apply max-key freqs (keys freqs))]
               (map #(assoc % :state (if (= mode (:metric %))
                                       "ok" "warning"))
                    events)))
    index))
Streaming mapcat. Calls children with each event in (f event), which should
return a sequence. For instance, to set the state of any services with
metrics deviating from the mode to "warning", one might use coalesce to
aggregate all services, and smapcat to find the mode and assoc the proper
states; emitting a series of individual events to the index.

```clojure
(coalesce
  (smapcat (fn [events]
             (let [freqs (frequencies (map :metric events))
                   mode  (apply max-key freqs (keys freqs))]
               (map #(assoc % :state (if (= mode (:metric %))
                                       "ok" "warning"))
                    events)))
    index))
```
sourceraw docstring

splitcljmacro

(split & clauses)

Behave as for split*, expecting predicates to be (where) expressions instead of functions. Example:

(split
  (< 0.9  metric) (with :state "critical" index)
  (< 0.75 metric) (with :state "warning" index)
  (with :state "ok" index))
Behave as for split*, expecting predicates to be (where) expressions instead
of functions. Example:

```clojure
(split
  (< 0.9  metric) (with :state "critical" index)
  (< 0.75 metric) (with :state "warning" index)
  (with :state "ok" index))
```
sourceraw docstring

split*clj

(split* & clauses)

Given a list of function and stream pairs, passes the current event onto the stream associated with the first passing condition.

Conditions are functions as for where*. An odd number of forms will make the last form the default stream. For example:

 (split*
   (fn [e] (< 0.9  (:metric e))) (with :state "critical" index)
   (fn [e] (< 0.75 (:metric e))) (with :state "warning" index)
   (with :state "ok" index))
Given a list of function and stream pairs, passes the current event onto the
stream associated with the first passing condition.

 Conditions are functions as for where*.  An odd number of forms will make
the last form the default stream. For example:

```clojure
 (split*
   (fn [e] (< 0.9  (:metric e))) (with :state "critical" index)
   (fn [e] (< 0.75 (:metric e))) (with :state "warning" index)
   (with :state "ok" index))
```
sourceraw docstring

split*-matchclj

(split*-match event [pred stream])
source

splitpcljmacro

(splitp pred expr & clauses)

Takes a binary predicate, an expression and a set of clauses. Each clause takes the form

test-expr stream

splitp returns a stream which accepts an event. Expr is a (where) expression, which will be evaluated against the event to obtain a value for selecting a clause. For each clause, evaluates (pred test-expr value). If the result is logical true, evaluates (stream event) and returns that value.

A single default stream can follow the clauses, and its value will be returned if no clause matches. If no default stream is provided and no clause matches, an IllegalArgumentException is thrown.

Splitp evaluates streams once at invocation time.

Example:

(splitp < metric
  0.9  (with :state "critical" index)
  0.75 (with :state "warning" index)
       (with :state "ok" index))
Takes a binary predicate, an expression and a set of clauses. Each clause
takes the form

test-expr stream

splitp returns a stream which accepts an event. Expr is a (where) expression,
which will be evaluated against the event to obtain a value for selecting a
clause. For each clause, evaluates (pred test-expr value). If the result is
logical true, evaluates (stream event) and returns that value.

A single default stream can follow the clauses, and its value will be
returned if no clause matches. If no default stream is provided and no clause
matches, an IllegalArgumentException is thrown.

Splitp evaluates streams once at invocation time.

Example:

```clojure
(splitp < metric
  0.9  (with :state "critical" index)
  0.75 (with :state "warning" index)
       (with :state "ok" index))
```
sourceraw docstring

sreduceclj

(sreduce f & opts)

Streaming reduce. Two forms:

(sreduce f child1 child2 ...)
(sreduce f val child1 child2 ...)

Maintains an internal value, which defaults to the first event received or, if provided, val. When the stream receives an event, calls (f val event) to produce a new value, which is sent to each child. f must be free of side effects. Examples:

Passes on events, but with the maximum of all received metrics:

(sreduce (fn [acc event] (assoc event :metric
                                (max (:metric event) (:metric acc)))) ...)

Or, using riemann.folds, a simple moving average:

(sreduce (fn [acc event] (folds/mean [acc event])) ...)

Streaming reduce. Two forms:

```clojure
(sreduce f child1 child2 ...)
(sreduce f val child1 child2 ...)
```

Maintains an internal value, which defaults to the first event received or,
if provided, val. When the stream receives an event, calls (f val event) to
produce a new value, which is sent to each child. f *must* be free of side
effects. Examples:

Passes on events, but with the *maximum* of all received metrics:

```clojure
(sreduce (fn [acc event] (assoc event :metric
                                (max (:metric event) (:metric acc)))) ...)
```

Or, using riemann.folds, a simple moving average:

(sreduce (fn [acc event] (folds/mean [acc event])) ...)
sourceraw docstring

stableclj

(stable dt f & children)

A stream which detects stable groups of events over time. Takes a time period in seconds, and a function of events. Passes on all events for which (f event1) is equal to (f event2), for each successive pair of events, for at least dt seconds. Use (stable) to filter out transient spikes and flapping states.

In these plots, stable events are shown as =, and unstable events are shown as -. = events are passed to children, and - events are ignored.

     A spike           Flapping           Stable changes
|                 |                    |
|       -         |    -- -   ======   |      =====
|                 |        -           |           ========
|======= ======   |====  -  --         |======
+------------->   +---------------->   +------------------>
      time              time                  time

May buffer events for up to dt seconds when the value of (f event) changes, in order to determine if the new value is stable or not.

; Passes on events where the state remains the same for at least five
; seconds.
(stable 5 :state prn)
A stream which detects stable groups of events over time. Takes a time
period in seconds, and a function of events. Passes on all events for which
(f event1) is equal to (f event2), for each successive pair of events, for at
least dt seconds. Use (stable) to filter out transient spikes and flapping
states.

In these plots, stable events are shown as =, and unstable events are shown
as -. = events are passed to children, and - events are ignored.

```
     A spike           Flapping           Stable changes
|                 |                    |
|       -         |    -- -   ======   |      =====
|                 |        -           |           ========
|======= ======   |====  -  --         |======
+------------->   +---------------->   +------------------>
      time              time                  time
```

May buffer events for up to dt seconds when the value of (f event) changes,
in order to determine if the new value is stable or not.

```clojure
; Passes on events where the state remains the same for at least five
; seconds.
(stable 5 :state prn)
```
sourceraw docstring

streamclj

(stream & args)
source

sum-over-timeclj

(sum-over-time & children)

Sums all metrics together. Emits the most recent event each time this stream is called, but with summed metric.

Sums all metrics together. Emits the most recent event each time this
stream is called, but with summed metric.
sourceraw docstring

tagclj

(tag tags & children)

Adds a new tag, or set of tags, to events which flow through.

(tag "foo" index) (tag ["foo" "bar"] index)

Adds a new tag, or set of tags, to events which flow through.

(tag "foo" index)
(tag ["foo" "bar"] index)
sourceraw docstring

taggedclj

Alias for tagged-all

Alias for tagged-all
sourceraw docstring

tagged-allclj

(tagged-all tags & children)

Passes on events where all tags are present. This stream returns true if an event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

(tagged-all "foo" prn)
(tagged-all ["foo" "bar"] prn)
Passes on events where all tags are present. This stream returns true if an
event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

```clojure
(tagged-all "foo" prn)
(tagged-all ["foo" "bar"] prn)
```
sourceraw docstring

tagged-all?clj

(tagged-all? tags event)

Predicate function to check if a collection of tags is present in the tags of event.

Predicate function to check if a collection of tags is
present in the tags of event.
sourceraw docstring

tagged-anyclj

(tagged-any tags & children)

Passes on events where any of tags are present. This stream returns true if an event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

(tagged-any "foo" prn)
(tagged-any ["foo" "bar"] prn)
Passes on events where any of tags are present. This stream returns true if
an event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

```clojure
(tagged-any "foo" prn)
(tagged-any ["foo" "bar"] prn)
```
sourceraw docstring

tagged-any?clj

(tagged-any? tags event)

Predicate function to check if any of a collection of tags are present in the tags of event.

Predicate function to check if any of a collection of tags
are present in the tags of event.
sourceraw docstring

throttleclj

(throttle n dt & children)

Passes on at most n events, or vectors of events, every dt seconds. If more than n events (or vectors) arrive in a dt-second fixed window, drops remaining events. Imposes no additional latency; events are either passed on immediately or dropped.

Passes on at most n events, or vectors of events, every dt seconds. If more
than n events (or vectors) arrive in a dt-second fixed window, drops
remaining events. Imposes no additional latency; events are either passed on
immediately or dropped.
sourceraw docstring

topclj

(top k f top-stream)
(top k f top-stream bottom-stream)
(top k f top-stream bottom-stream demote?)

Bifurcates a stream into a dual pair of streams: one for the top k events, and one for the bottom k events.

f is a function which maps events to comparable values, e.g. numbers. If an incoming event e falls in the top k, the top stream receives e and the bottom stream receives (expire e). If the event is not in the top k, calls (top (expire e)) and (bottom e).

If an inbound event is already expired, it is forwarded directly to both streams. In this way, both top- and bottom-stream have a consistent, dual view of the event space.

Index the top 10 events, by metric:

(top 10 :metric index)

Index everything, but tag the top k events with "top":

(top 10 :metric
  (tag "top" index)
  index)

This implementation of top is lazy, in the sense that it won't proactively expire events which are bumped from the top-k set--you have to wait for another event with the same host and service to arrive before child streams will know it's expired.

Bifurcates a stream into a dual pair of streams: one for the top k events,
and one for the bottom k events.

f is a function which maps events to comparable values, e.g. numbers. If an
incoming event e falls in the top k, the top stream receives e and the bottom
stream receives (expire e). If the event is *not* in the top k, calls (top
(expire e)) and (bottom e).

If an inbound event is already expired, it is forwarded directly to both
streams. In this way, both top- and bottom-stream have a consistent, dual
view of the event space.

Index the top 10 events, by metric:

```clojure
(top 10 :metric index)
```

Index everything, but tag the top k events with "top":

```clojure
(top 10 :metric
  (tag "top" index)
  index)
```

This implementation of top is lazy, in the sense that it won't proactively
expire events which are bumped from the top-k set--you have to wait for
another event with the same host and service to arrive before child streams
will know it's expired.
sourceraw docstring

underclj

(under x & children)

Passes on events only when their metric is smaller than x

Passes on events only when their metric is smaller than x
sourceraw docstring

untagclj

(untag tags & children)

Removes a tag, or set of tags, from events which flow through.

(untag "foo" index) (untag ["foo" "bar"] index)

Removes a tag, or set of tags, from events which flow through.

(untag "foo" index)
(untag ["foo" "bar"] index)
sourceraw docstring

wherecljmacro

(where expr & children)

Passes on events where expr is true. Expr is rewritten using where-rewrite. 'event is bound to the event under consideration. Examples:

; Match any event where metric is either 1, 2, 3, or 4.
(where (metric 1 2 3 4) ...)

; Match a event where the metric is negative AND the state is ok.
(where (and (> 0 metric)
            (state "ok")) ...)

; Match a event where the host begins with web
(where (host #"^web") ...)


; Match an event where the service is in a set of services
(where (service #{"service-foo" "service-bar"}) ...)
; which is equivalent to
(where (service "service-foo" "service-bar") ...)

If a child begins with (else ...), the else's body is executed when expr is false. For instance:

(where (service "www")
  (notify-www-team)
  (else
    (notify-misc-team)))

The streams generated by (where) return the value of expr: truthy if expr matched the given event, and falsey otherwise. This means (where (metric 5)) tests events and returns true if their metric is five.

Passes on events where expr is true. Expr is rewritten using where-rewrite.
'event is bound to the event under consideration. Examples:

```clojure
; Match any event where metric is either 1, 2, 3, or 4.
(where (metric 1 2 3 4) ...)

; Match a event where the metric is negative AND the state is ok.
(where (and (> 0 metric)
            (state "ok")) ...)

; Match a event where the host begins with web
(where (host #"^web") ...)


; Match an event where the service is in a set of services
(where (service #{"service-foo" "service-bar"}) ...)
; which is equivalent to
(where (service "service-foo" "service-bar") ...)
```

If a child begins with (else ...), the else's body is executed when expr is
false. For instance:

```clojure
(where (service "www")
  (notify-www-team)
  (else
    (notify-misc-team)))
```

The streams generated by (where) return the value of expr: truthy if expr
matched the given event, and falsey otherwise. This means (where (metric 5))
tests events and returns true if their metric is five.
sourceraw docstring

where*cljmacro

(where* f & children)

A simpler, less magical variant of (where). Instead of binding symbols in the context of an expression, where* takes a function which takes an event. When (f event) is truthy, passes event to children--and otherwise, passes event to (else ...) children. For example:

(where* (fn [e] (< 2 (:metric e))) prn)

(where* expired?
  (partial prn "Expired")
  (else
    (partial prn "Not expired!")))
A simpler, less magical variant of (where). Instead of binding symbols in
the context of an expression, where* takes a function which takes an event.
When (f event) is truthy, passes event to children--and otherwise, passes
event to (else ...) children. For example:

```clojure
(where* (fn [e] (< 2 (:metric e))) prn)

(where* expired?
  (partial prn "Expired")
  (else
    (partial prn "Not expired!")))
```
sourceraw docstring

where-partition-clausesclj

(where-partition-clauses exprs)

Given expressions like (a (else b) c (else d)), returns [[a c] [b d]]

Given expressions like (a (else b) c (else d)), returns [[a c] [b d]]
sourceraw docstring

where-rewriteclj

(where-rewrite expr)

Rewrites lists recursively. Replaces (metric x y z) with a test matching (:metric event) to any of x, y, or z, either by = or re-find. Replaces any other instance of metric with (:metric event). Does the same for host, service, event, state, time, ttl, tags (which performs an exact match of the tag vector), tagged (which checks to see if the given tag is present at all), metric_f, and description.

Rewrites lists recursively. Replaces (metric x y z) with a test matching
(:metric event) to any of x, y, or z, either by = or re-find. Replaces any
other instance of metric with (:metric event). Does the same for host,
service, event, state, time, ttl, tags (which performs an exact match of the
tag vector), tagged (which checks to see if the given tag is present at all),
metric_f, and description.
sourceraw docstring

windowclj

(window n & children)

Alias for moving-event-window.

Alias for moving-event-window.
sourceraw docstring

withclj

(with & args)

Constructs a copy of each incoming event with new values for the given keys, and passes the resulting event on to each child stream. As everywhere in Riemann, events are immutable; only this stream's children will see this version of the event.

If you only want to set default values, use default. If you want to update values for a key based on the current value of that field in each event, use adjust. If you want to update events using arbitrary functions, use smap.

; Print each event, but with service "foo"
(with :service "foo" prn)

; Print each event, but with no host and state "broken".
(with {:host nil :state "broken"} prn)
Constructs a copy of each incoming event with new values for the given keys,
and passes the resulting event on to each child stream. As everywhere in
Riemann, events are immutable; only this stream's children will see this
version of the event.

If you only want to set *default* values, use `default`. If you want to
update values for a key based on the *current value* of that field in each
event, use `adjust`. If you want to update events using arbitrary functions,
use `smap`.

```clojure
; Print each event, but with service "foo"
(with :service "foo" prn)

; Print each event, but with no host and state "broken".
(with {:host nil :state "broken"} prn)
```
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close