Reactor is updated to 1.1.x
.
Reactor is updated to 1.1.0.RELEASE
.
Reactor is updated to 1.1.0.RC1
.
clojurewerkz.meltdown.selectors/set-membership
is a new selector
that matches on element membership in a set:
(require '[clojurewerkz.meltdown.reactor :as mr])
(require '[clojurewerkz.meltdown.selectors :as ms)
(let [r (mr/create)]
(mr/on r (ms/set-membership #{"white" "red" "blue"} (fn [evt])))
Reactor is updated to 1.1.0.M3
.
Reactor 1.1.0.M3
no longer supports default key (selector),
so 2-arity of clojurewerkz.meltdown.reactor/on
was removed.
Meltdown now depends on org.clojure/clojure
version 1.6.0
. It is
still compatible with Clojure 1.4 and if your project.clj
depends on
a different version, it will be used, but 1.6 is the default now.
clojurewerkz.meltdown.selectors/selectors-on
is a new function that
returns a list of selectors registered on a reactor:
(require '[clojurewerkz.meltdown.reactor :as mr])
(require '[clojurewerkz.meltdown.selectors :as ms :refer [$])
(let [r (mr/create)]
(mr/on r ($ "a.key) (fn [evt]))
(ms/selectors-on r))
clojurewerkz.meltdown.consumers/consumer-count
is a new function that
returns a number of consumers registered on a reactor:
(require '[clojurewerkz.meltdown.reactor :as mr])
(require '[clojurewerkz.meltdown.selectors :refer [$])
(require '[clojurewerkz.meltdown.consumers :as mc])
(let [r (mr/create)]
(mr/on r ($ "a.key) (fn [evt]))
(mc/consumer-count r))
Meltdown now includes event key when transforming them into Clojure maps.
Example event map:
{:data {:event delivered}, :reply-to nil, :headers {}, :key events.dummy, :id #uuid "5714bb01-ac7e-11e3-64b3-6b2c231ad83a"}
clojurewerkz.meltdown.selectors/predicate
is a new function
that creates a match-all selector (a predicate selector
that unconditionally returns true
).
clojurewerkz.meltdown.selectors/predicate
is a new function
that creates a predicate selector:
(require '[clojurewerkz.meltdown.reactor :as mr])
(require '[clojurewerkz.meltdown.selectors :as ms])
(let [r (mr/create)
;; will filter out events with keys that are
;; odd numbers
sel (ms/predicate even?)]
)
Reactor is updated to 1.1.0.M2
.
Previously Meltdown instantiated a new Environment
per
clojurewerkz.meltdown.reactor/create
invocation without
a provided environment. This lead to excessive thread creation
which could eventually exhaust system resources.
Meltdown 1.0.0-beta5
will reuse the same environment for
all created reactors unless its asked to use a specific
Environment
instance.
clojurewerkz.meltdown.env/environment
is a function that returns
a shared environment. To create a completely new environment from
scratch, use clojurewerkz.meltdown.env/create
.
clojurewerkz.meltdown.env/shutdown
shuts down environments and
all associated dispatchers.
clojurewerkz.meltdown.fn/->filter
clojurewerkz.meltdown.fn/->filter
is a new function that reifies
Reactor filters from Clojure functions.
clojurewerkz.meltdown.streams/fn->function
and
clojurewerkz.meltdown.streams/fn->predicate
are removed, use
clojurewerkz.meltdown.fn/->function
and
clojurewerkz.meltdown.fn/->predicate
instead.
Stream operations are now lazier in Reactor. To enforce stream
sources to be drained, use clojurewerkz.meltdown.streams/flush
which accepts a stream or deferred.
Reactor is updated to 1.1.0.M1
which has multiple breaking API
changes.
Reactor is updated to 1.0.0.RELEASE
.
Reactor is updated to 1.0.0.RC1
.
You can add listeners for Exceptions
that are occuring inside of your processing pipeline by
subscribing to events based on the class
of exception. For example, in order to subscribe
to all Exceptions, you can:
(mr/on-error r Exception (fn [event]
(println event)))
In order to subscribe to only RuntimeExceptions
:
(mr/on-error r RuntimeException (fn [event]
(println event)))
Reactor is updated to 1.0.0.M3
.
When creating reactor, it's now possible to plug in a custom dispatcher or configure an underlying dispatcher in a way that's most suitable for your application, for example:
(ns my-app.core
(:import [reactor.event.dispatch RingBufferDispatcher]
[com.lmax.disruptor.dsl ProducerType]
[com.lmax.disruptor YieldingWaitStrategy]))
;; Creates a RingBuffer Dispatcher, with a custom queue size of 4096
(def reactor (mr/create :dispatcher (RingBufferDispatcher. "dispatcher-name"
4096
ProducerType/MULTI
(YieldingWaitStrategy.))))
It is only possible to specify type of dispatcher when there's an Environment attached to reactor. Option was previously missing.
New namespace, clojurewerkz.meltdown.stream-graph
, was added for building graphs in a declarative
manner
(ns my-stream-graphs-ns
(:use clojurewerkz.meltdown.stream-graph))
(def res (atom nil))
(def channel (graph (create)
(map* inc
(reduce* #(+ %1 %2) 0
(consume #(reset! res %))))))
(accept channel 1)
(accept channel 2)
(accept channel 3)
@res
;; => 9
In order to attach and detach graph parts, you can use attach
and detach
functions from same
namespace:
(let [even-sum (atom nil)
odd-sum (atom nil)
even-summarizer (detach
(filter* even?
(reduce* #(+ %1 %2) 0
(consume #(reset! even-sum %)))))
odd-summarizer (detach
(filter* odd?
(reduce* #(+ %1 %2) 0
(consume #(reset! odd-sum %)))))
summarizer #(+ %1 %2)
channel (graph (create)
(map* inc
(attach even-summarizer)
(attach odd-summarizer)))]
(accept channel 1)
(accept channel 2)
(accept channel 3)
(accept channel 4)
@even-sum
;; => 6
@odd-sum
;; => 8
)
Added an ability to create custom streams, whenever map*
, reduce*
, filter*
and batch*
are not
enough. For that, you can use clojurewerkz.meltdown.streams/custom-stream
. For example, you'd like
to create a stream that will only dispatch every 5th value further. For state, you can use
let-over-lamda:
(defn every-fifth-stream
"Defines a stream that will receive all events from upstream and dispatch
every fifth event further down"
[upstream]
(let [counter (atom 0)]
(custom-stream
(fn [event downstream]
(swap! counter inc)
(when (= 5 @counter)
(reset! counter 0)
(accept downstream event)))
upstream)))
You can use custom streams same way as you usually use internal ones:
(def channel (create))
(def result (atom nil))
(def incrementer (map* inc channel)
(def inst-every-fifth-stream (every-fifth-stream incrememter))
(consume inst-every-fifth-stream #(reset! res %))
(accept channel 1) ;; @res is still `nil`
(accept channel 2) ;; @res is still `nil`
(accept channel 3) ;; @res is still `nil`
(accept channel 4) ;; @res is still `nil`
(accept channel 5)
@res ;; => 6
Initial release
Supported features:
notify
, on
, send
, receive
dispatcher
and routing-strategy
$
and regexp
onesmap*
, reduce*
, filter*
and batch*
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close