(new-system configuration-overrides
{:keys [kafka database processor-identifier configuration-prefix
additional-dependencies processing-enabled
kafka-consumer-group-configuration kafka-consumer-group
processor-configuration processor rewind-check
event-handler]
:or {kafka :kafka
database :database
processor-identifier :main
configuration-prefix :service
additional-dependencies {}}})
Creates a new kafka event processor.
Does nothing if processing is not enabled.
All system map keys can be overridden or they default where applicable:
Optionally provide a system map key for rewind-check idempotent-check and event-handler
Optional provide a map of system keys that are used as additional dependencies to the component
e.g.
(processors/new-system
configuration-overrides
{:processor-identifier :main
:kafka :kafka
:database :database
:event-handler :event-handler
:additional-dependencies {:atom :atom}})
Nothing is done with the event if an event-handler is not defined.
(deftype AtomEventHandler
[atom]
EventHandler
(extract-payload
[this event]
(-> event
(hal-json/json->resource)
(hal/get-property :payload)
(hal-json/json->resource)))
(processable?
[this processor event event-context]
true)
(on-event
[this processor {:keys [topic payload]} _]
(vent/react-to all {:channel topic :payload payload} processor))
(on-complete
[this processor {:keys [topic partition payload]} {:keys [event-processor]}]
(swap! atom conj {:processor event-processor
:topic topic
:partition partition
:event-id (event-resource->id payload)})))
Creates a new kafka event processor. Does nothing if processing is not enabled. * Processor identifier can be specified (defaults to :main). * Configuration prefix can be specified (defaults to :service). All system map keys can be overridden or they default where applicable: * kafka: kafka * database: database * processing-enabled: {processor-identifier}-processing-enabled? * kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration * kafka-consumer-group: kafka-{processor-identifier}-consumer-group * processor-configuration: {processor-identifier}-processor-configuration * processor: {processor-identifier}-processor Optionally provide a system map key for rewind-check idempotent-check and event-handler Optional provide a map of system keys that are used as additional dependencies to the component e.g. ```` (processors/new-system configuration-overrides {:processor-identifier :main :kafka :kafka :database :database :event-handler :event-handler :additional-dependencies {:atom :atom}}) ```` Nothing is done with the event if an event-handler is not defined. ```` (deftype AtomEventHandler [atom] EventHandler (extract-payload [this event] (-> event (hal-json/json->resource) (hal/get-property :payload) (hal-json/json->resource))) (processable? [this processor event event-context] true) (on-event [this processor {:keys [topic payload]} _] (vent/react-to all {:channel topic :payload payload} processor)) (on-complete [this processor {:keys [topic partition payload]} {:keys [event-processor]}] (swap! atom conj {:processor event-processor :topic topic :partition partition :event-id (event-resource->id payload)}))) ````
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close