Liking cljdoc? Tell your friends :D

bortexz.eventflow


-AsyncPipelinecljprotocol

-publish-chclj

(-publish-ch this ch)
source

-IncrementalPipelinecljprotocol

-flushclj

(-flush this)

-pending-eventsclj

(-pending-events this)
source

-Pipelinecljprotocol

-add-nodeclj

(-add-node this node-id node-config)

-remove-nodeclj

(-remove-node this node-id)
source

-Publishcljprotocol

-publishclj

(-publish this event)
source

add-nodeclj

(add-node pipeline node-id node-config)

Adds a node identified with node-id to given pipeline.

node-config is a map of:

  • ::xform transducer that processes events and will automatically publish emitted values into the pipeline.
  • ::topics map of {<subscribed topic> #{<emits-topics>}}. Currently emitted topics are not used internally, but mostly useful as documentation for the node. In the future this might be used to generate visualizations about the pipeline.
  • ::async node options used on an async pipeline:
    • ::parallelism integer, number of workers that share the work for this node. Defaults to 1.
    • ::thread? boolean, if true then the execution of the xform happens inside a real thread, presumably for side-effects or heavy computations. Defaults to false. When false, uses go-loop, and the limitations about go block apply to the execution of xform.
    • ::topic-buf-fn 1-arity fn accepting a topic and returns a core.async buffer, int (fixed buffer) or nil (unbuffered).
    • ::combine-chs-fn 1-arity fn accepting a map of {<topic> <ch>} and returns a chan that will contain the events emitted from each topic's ch, that will further be consumed by the xform. Defaults to merging all values using core.async/merge.

If node with given node-id already exists on the pipeline, throws exception.

Adds a node identified with `node-id` to given `pipeline`.

`node-config` is a map of:
- `::xform` transducer that processes events and will automatically publish emitted values into the pipeline.
- `::topics` map of {<subscribed topic> #{<emits-topics>}}. Currently emitted topics are not used internally, but
mostly useful as documentation for the node. In the future this might be used to generate visualizations about the
pipeline.
- `::async` node options used on an async pipeline:
  - `::parallelism` integer, number of workers that share the work for this node. Defaults to 1.
  - `::thread?` boolean, if `true` then the execution of the xform happens inside a real thread, presumably for 
  side-effects or heavy computations. Defaults to false. When false, uses go-loop, and the limitations about go block
  apply to the execution of xform.
  - `::topic-buf-fn` 1-arity fn accepting a topic and returns a core.async buffer, int (fixed buffer) 
  or nil (unbuffered).
  - `::combine-chs-fn` 1-arity fn accepting a map of {<topic> <ch>} and returns a chan that will contain the events
  emitted from each topic's ch, that will further be consumed by the xform. 
  Defaults to merging all values using `core.async/merge`.

If node with given `node-id` already exists on the pipeline, throws exception.
sourceraw docstring

async-pipelineclj

(async-pipeline topic-fn)
(async-pipeline topic-fn
                {:bortexz.eventflow/keys [topic-buf-fn ex-handler]
                 :or {topic-buf-fn (constantly nil)
                      ex-handler uc/uncaught-exception}})

Creates an async pipeline, using core.async internally. When publishing a message, the event gets routed to an internal mult for the given topic. Topics are automatically added and removed depending if they have any subscriber. Messages to topics that have no subscribers are dropped at the publisher side. There is no internal router chan, routing to topic mults happens at the publisher/node side.

Using publish is blocking (uses >!!). publish-ch can be used to publish messages to the pipeline from a chan.

Accepts topic-fn and an (optional) map of options.

topic-fn 1-arity fn that returns the topic from a given event.

Options (namespaced):

  • ::topic-buf-fn 1-arity fn that will be called each time a topic needs to be created with given topic, and must return a core.async buffer or int (fixed buffer) or nil for unbuffered. Defaults to (constantly nil)
  • ::ex-handler 1-arity fn that will be called when processing a node throws an exception. The exception is an ExceptionInfo with data {:node-id ... :event ... :exception <original node exception>}. Defaults to use the uncaught exception handler of current thread.

Adding/removing nodes can be used from different threads safely. Removing a node is a blocking operation (uses <!! to wait until the node has finished processing all buffered events up until the removal).

Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception. A caveat about this: If multiple threads try to remove the same existing node, they will all block until the node is removed, without throwing an exception.

Creates an async pipeline, using core.async internally. When publishing a message, the event gets routed to an 
internal mult for the given topic. Topics are automatically added and removed depending if they have any subscriber. 
Messages to topics that have no subscribers are dropped at the publisher side. There is no internal router chan, 
routing to topic mults happens at the publisher/node side. 

Using [[publish]] is blocking (uses >!!). [[publish-ch]] can be used to publish messages to the pipeline from a chan.

Accepts `topic-fn` and an (optional) map of options.

`topic-fn` 1-arity fn that returns the topic from a given event.

Options (namespaced):
- `::topic-buf-fn` 1-arity fn that will be called each time a topic needs to be created with given topic, and must
return a core.async buffer or int (fixed buffer) or nil for unbuffered. Defaults to `(constantly nil)`
- `::ex-handler` 1-arity fn that will be called when processing a node throws an exception. The exception is an 
ExceptionInfo with data {:node-id ... :event ... :exception <original node exception>}.
Defaults to use the uncaught exception handler of current thread.

Adding/removing nodes can be used from different threads safely. Removing a node is a blocking operation 
(uses <!! to wait until the node has finished processing all buffered events up until the removal).

Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception.
A caveat about this: If multiple threads try to remove the same existing node, they will all block until the node is
removed, without throwing an exception.
sourceraw docstring

drainclj

(drain incremental-pipeline)

Drains given incremental-pipeline, executing flush until the pipeline doesn't have any more events pending.

Drains given `incremental-pipeline`, executing [[flush]] until the pipeline doesn't have any more events pending.
sourceraw docstring

flushclj

(flush incremental-pipeline)

Flushes the next event in the queue from an incremental-pipeline, processing all nodes subscribed to its topic.

Flushes the next event in the queue from an `incremental-pipeline`, processing all nodes subscribed to its topic.
sourceraw docstring

incremental-pipelineclj

(incremental-pipeline topic-fn)
(incremental-pipeline topic-fn
                      {:bortexz.eventflow/keys [parallel?]
                       :or {parallel? false}})

Creates an incremental pipeline. When being published to, events are stored inside the pipeline until flush is called. Flush will propagate the first event of the queue through the pipeline, executing all nodes subscribed to the event's ::topic. This step might generate more events from the processing nodes that are added into the pipeline. Use drain to flush the pipeline in loop until no more events are emited.

Accepts topic-fn and an (optional) map of options.

topic-fn 1-arity fn that returns the topic from a given event.

Options (namespaced):

  • ::parallel? if true, nodes are executed in parallel during flush.

Not thread safe currently, this pipeline is intended to be run from a single thread, behaviour in multithreading is undefined.

Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception.

Creates an incremental pipeline. When being published to, events are stored inside the pipeline until [[flush]] is 
called. Flush will propagate the first event of the queue through the pipeline, executing all nodes subscribed to the 
event's `::topic`. This step might generate more events from the processing nodes that are added into the pipeline.
Use [[drain]] to flush the pipeline in loop until no more events are emited.

Accepts `topic-fn` and an (optional) map of options.

`topic-fn` 1-arity fn that returns the topic from a given event.

Options (namespaced):
- `::parallel?` if true, nodes are executed in parallel during [[flush]].

Not thread safe currently, this pipeline is intended to be run from a single thread, behaviour in multithreading is 
undefined.

Adding a node that already exists, or trying to remove a node that doesn't exist will result in an exception.
sourceraw docstring

pending-eventsclj

(pending-events incremental-pipeline)

Returns a PersistentQueue of the current events accumulated into incremental-pipeline

Returns a PersistentQueue of the current events accumulated into `incremental-pipeline`
sourceraw docstring

publishclj

(publish pipeline event)

Publishes a new event into pipeline.

Publishes a new `event` into `pipeline`.
sourceraw docstring

publish-chclj

(publish-ch async-pipeline events-ch)

Publishes contents of events-ch into async-pipeline. Returns a ch that will close when the chan has been exhausted.

Publishes contents of `events-ch` into `async-pipeline`. Returns a ch that will close when the chan has been 
exhausted.
sourceraw docstring

remove-nodeclj

(remove-node pipeline node-id)

Removes node with given node-id from pipeline.

Removes node with given `node-id` from `pipeline`. 
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close