(-add-node this node-id node-config)
(-remove-node this node-id)
(add-node pipeline node-id node-config)
Adds a node identified with node-id
to given pipeline
.
node-config
is a map of:
::xform
transducer that processes events and will automatically publish emitted values into the pipeline.::topics
map of {<subscribed topic> #{<emits-topics>}}. Currently emitted topics are not used internally, but
mostly useful as documentation for the node. In the future this might be used to generate visualizations about the
pipeline.::async
node options used on an async pipeline:
::parallelism
integer, number of workers that share the work for this node. Defaults to 1.::thread?
boolean, if true
then the execution of the xform happens inside a real thread, presumably for
side-effects or heavy computations. Defaults to false. When false, uses go-loop, and the limitations about go block
apply to the execution of xform.::topic-buf-fn
1-arity fn accepting a topic and returns a core.async buffer, int (fixed buffer)
or nil (unbuffered).::combine-chs-fn
1-arity fn accepting a map of {<topic> <ch>} and returns a chan that will contain the events
emitted from each topic's ch, that will further be consumed by the xform.
Defaults to merging all values using core.async/merge
.If node with given node-id
already exists on the pipeline, throws exception.
Adds a node identified with `node-id` to given `pipeline`. `node-config` is a map of: - `::xform` transducer that processes events and will automatically publish emitted values into the pipeline. - `::topics` map of {<subscribed topic> #{<emits-topics>}}. Currently emitted topics are not used internally, but mostly useful as documentation for the node. In the future this might be used to generate visualizations about the pipeline. - `::async` node options used on an async pipeline: - `::parallelism` integer, number of workers that share the work for this node. Defaults to 1. - `::thread?` boolean, if `true` then the execution of the xform happens inside a real thread, presumably for side-effects or heavy computations. Defaults to false. When false, uses go-loop, and the limitations about go block apply to the execution of xform. - `::topic-buf-fn` 1-arity fn accepting a topic and returns a core.async buffer, int (fixed buffer) or nil (unbuffered). - `::combine-chs-fn` 1-arity fn accepting a map of {<topic> <ch>} and returns a chan that will contain the events emitted from each topic's ch, that will further be consumed by the xform. Defaults to merging all values using `core.async/merge`. If node with given `node-id` already exists on the pipeline, throws exception.
(async-pipeline topic-fn)
(async-pipeline topic-fn
{:bortexz.eventflow/keys [topic-buf-fn ex-handler]
:or {topic-buf-fn (constantly nil)
ex-handler uc/uncaught-exception}})
Creates an async pipeline, using core.async internally. When publishing a message, the event gets routed to an internal mult for the given topic. Topics are automatically added and removed depending if they have any subscriber. Messages to topics that have no subscribers are dropped at the publisher side. There is no internal router chan, routing to topic mults happens at the publisher/node side.
Using publish
is blocking (uses >!!). publish-ch
can be used to publish messages to the pipeline from a chan.
Accepts topic-fn
and an (optional) map of options.
topic-fn
1-arity fn that returns the topic from a given event.
Options (namespaced):
::topic-buf-fn
1-arity fn that will be called each time a topic needs to be created with given topic, and must
return a core.async buffer or int (fixed buffer) or nil for unbuffered. Defaults to (constantly nil)
::ex-handler
1-arity fn that will be called when processing a node throws an exception. The exception is an
ExceptionInfo with data {:node-id ... :event ... :exception <original node exception>}.
Defaults to use the uncaught exception handler of current thread.Adding/removing nodes can be used from different threads safely. Removing a node is a blocking operation (uses <!! to wait until the node has finished processing all buffered events up until the removal).
Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception. A caveat about this: If multiple threads try to remove the same existing node, they will all block until the node is removed, without throwing an exception.
Creates an async pipeline, using core.async internally. When publishing a message, the event gets routed to an internal mult for the given topic. Topics are automatically added and removed depending if they have any subscriber. Messages to topics that have no subscribers are dropped at the publisher side. There is no internal router chan, routing to topic mults happens at the publisher/node side. Using [[publish]] is blocking (uses >!!). [[publish-ch]] can be used to publish messages to the pipeline from a chan. Accepts `topic-fn` and an (optional) map of options. `topic-fn` 1-arity fn that returns the topic from a given event. Options (namespaced): - `::topic-buf-fn` 1-arity fn that will be called each time a topic needs to be created with given topic, and must return a core.async buffer or int (fixed buffer) or nil for unbuffered. Defaults to `(constantly nil)` - `::ex-handler` 1-arity fn that will be called when processing a node throws an exception. The exception is an ExceptionInfo with data {:node-id ... :event ... :exception <original node exception>}. Defaults to use the uncaught exception handler of current thread. Adding/removing nodes can be used from different threads safely. Removing a node is a blocking operation (uses <!! to wait until the node has finished processing all buffered events up until the removal). Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception. A caveat about this: If multiple threads try to remove the same existing node, they will all block until the node is removed, without throwing an exception.
(drain incremental-pipeline)
Drains given incremental-pipeline
, executing flush
until the pipeline doesn't have any more events pending.
Drains given `incremental-pipeline`, executing [[flush]] until the pipeline doesn't have any more events pending.
(flush incremental-pipeline)
Flushes the next event in the queue from an incremental-pipeline
, processing all nodes subscribed to its topic.
Flushes the next event in the queue from an `incremental-pipeline`, processing all nodes subscribed to its topic.
(incremental-pipeline topic-fn)
(incremental-pipeline topic-fn
{:bortexz.eventflow/keys [parallel?]
:or {parallel? false}})
Creates an incremental pipeline. When being published to, events are stored inside the pipeline until flush
is
called. Flush will propagate the first event of the queue through the pipeline, executing all nodes subscribed to the
event's ::topic
. This step might generate more events from the processing nodes that are added into the pipeline.
Use drain
to flush the pipeline in loop until no more events are emited.
Accepts topic-fn
and an (optional) map of options.
topic-fn
1-arity fn that returns the topic from a given event.
Options (namespaced):
::parallel?
if true, nodes are executed in parallel during flush
.Not thread safe currently, this pipeline is intended to be run from a single thread, behaviour in multithreading is undefined.
Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception.
Creates an incremental pipeline. When being published to, events are stored inside the pipeline until [[flush]] is called. Flush will propagate the first event of the queue through the pipeline, executing all nodes subscribed to the event's `::topic`. This step might generate more events from the processing nodes that are added into the pipeline. Use [[drain]] to flush the pipeline in loop until no more events are emited. Accepts `topic-fn` and an (optional) map of options. `topic-fn` 1-arity fn that returns the topic from a given event. Options (namespaced): - `::parallel?` if true, nodes are executed in parallel during [[flush]]. Not thread safe currently, this pipeline is intended to be run from a single thread, behaviour in multithreading is undefined. Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception.
(pending-events incremental-pipeline)
Returns a PersistentQueue of the current events accumulated into incremental-pipeline
Returns a PersistentQueue of the current events accumulated into `incremental-pipeline`
(publish pipeline event)
Publishes a new event
into pipeline
.
Publishes a new `event` into `pipeline`.
(publish-ch async-pipeline events-ch)
Publishes contents of events-ch
into async-pipeline
. Returns a ch that will close when the chan has been
exhausted.
Publishes contents of `events-ch` into `async-pipeline`. Returns a ch that will close when the chan has been exhausted.
(remove-node pipeline node-id)
Removes node with given node-id
from pipeline
.
Removes node with given `node-id` from `pipeline`.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close