Liking cljdoc? Tell your friends :D
Clojure only.

kafka-event-processor.processor.system


new-systemclj

(new-system configuration-overrides
            {:keys [kafka database processor-identifier configuration-prefix
                    additional-dependencies processing-enabled
                    kafka-consumer-group-configuration kafka-consumer-group
                    processor-configuration processor rewind-check
                    idempotent-check event-handler]
             :or {kafka :kafka
                  database :database
                  processor-identifier :main
                  configuration-prefix :service
                  additional-dependencies {}}})

Creates a new kafka event processor.

Does nothing if processing is not enabled.

  • Processor identifier can be specified (defaults to :main).
  • Configuration prefix can be specified (defaults to :service).

All system map keys can be overridden or they default where applicable:

  • kafka: kafka
  • database: database
  • processing-enabled: {processor-identifier}-processing-enabled?
  • kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
  • kafka-consumer-group: kafka-{processor-identifier}-consumer-group
  • processor-configuration: {processor-identifier}-processor-configuration
  • processor: {processor-identifier}-processor

Optionally provide a system map key for rewind-check idempotent-check and event-handler

Optional provide a map of system keys that are used as additional dependencies to the component

e.g.

(processors/new-system
  configuration-overrides
  {:processor-identifier    :main
   :kafka                   :kafka
   :database                :database
   :event-handler           :event-handle
   :additional-dependencies {:atom :atom}})

Nothing is done with the event if an event-handler is not defined.

(deftype AtomEventHandler
  [atom]
  EventHandler
  (on-event
     [this processor {:keys [topic resource]} _]
     (vent/react-to all {:channel topic :payload resource} processor))
  (on-complete
     [this processor {:keys [topic partition]} {:keys [event-processor event-id]}]
     (swap! atom conj {:processor event-processor
                       :topic     topic
                       :partition partition
                       :event-id  event-id})))
Creates a new kafka event processor.

Does nothing if processing is not enabled.

* Processor identifier can be specified (defaults to :main).
* Configuration prefix can be specified (defaults to :service).

All system map keys can be overridden or they default where applicable:

* kafka: kafka
* database: database
* processing-enabled: {processor-identifier}-processing-enabled?
* kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
* kafka-consumer-group: kafka-{processor-identifier}-consumer-group
* processor-configuration: {processor-identifier}-processor-configuration
* processor: {processor-identifier}-processor

Optionally provide a system map key for rewind-check idempotent-check and event-handler

Optional provide a map of system keys that are used as additional dependencies to the component

e.g.

````
(processors/new-system
  configuration-overrides
  {:processor-identifier    :main
   :kafka                   :kafka
   :database                :database
   :event-handler           :event-handle
   :additional-dependencies {:atom :atom}})
````

Nothing is done with the event if an event-handler is not defined.

````
(deftype AtomEventHandler
  [atom]
  EventHandler
  (on-event
     [this processor {:keys [topic resource]} _]
     (vent/react-to all {:channel topic :payload resource} processor))
  (on-complete
     [this processor {:keys [topic partition]} {:keys [event-processor event-id]}]
     (swap! atom conj {:processor event-processor
                       :topic     topic
                       :partition partition
                       :event-id  event-id})))
````
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close