Liking cljdoc? Tell your friends :D
Clojure only.

kafka-event-processor.processor.system


new-systemclj

(new-system configuration-overrides
            {:keys [kafka database processor-identifier configuration-prefix
                    additional-dependencies processing-enabled
                    kafka-consumer-group-configuration kafka-consumer-group
                    processor-configuration processor rewind-check
                    event-handler]
             :or {kafka :kafka
                  database :database
                  processor-identifier :main
                  configuration-prefix :service
                  additional-dependencies {}}})

Creates a new kafka event processor.

Does nothing if processing is not enabled.

  • Processor identifier can be specified (defaults to :main).
  • Configuration prefix can be specified (defaults to :service).

All system map keys can be overridden or they default where applicable:

  • kafka: kafka
  • database: database
  • processing-enabled: {processor-identifier}-processing-enabled?
  • kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
  • kafka-consumer-group: kafka-{processor-identifier}-consumer-group
  • processor-configuration: {processor-identifier}-processor-configuration
  • processor: {processor-identifier}-processor

Optionally provide a system map key for rewind-check idempotent-check and event-handler

Optional provide a map of system keys that are used as additional dependencies to the component

e.g.

(processors/new-system
  configuration-overrides
  {:processor-identifier    :main
   :kafka                   :kafka
   :database                :database
   :event-handler           :event-handler
   :additional-dependencies {:atom :atom}})

Nothing is done with the event if an event-handler is not defined.

(deftype AtomEventHandler
 [atom]
 EventHandler
 (extract-payload
   [this event]
   (-> event
     (hal-json/json->resource)
     (hal/get-property :payload)
     (hal-json/json->resource)))
 (processable?
   [this processor event event-context]
   true)
 (on-event
   [this processor {:keys [topic payload]} _]
   (vent/react-to all {:channel topic :payload payload} processor))
 (on-complete
   [this processor {:keys [topic partition payload]} {:keys [event-processor]}]
   (swap! atom conj {:processor event-processor
                     :topic     topic
                     :partition partition
                     :event-id  (event-resource->id payload)})))
Creates a new kafka event processor.

 Does nothing if processing is not enabled.

 * Processor identifier can be specified (defaults to :main).
 * Configuration prefix can be specified (defaults to :service).

 All system map keys can be overridden or they default where applicable:

 * kafka: kafka
 * database: database
 * processing-enabled: {processor-identifier}-processing-enabled?
 * kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
 * kafka-consumer-group: kafka-{processor-identifier}-consumer-group
 * processor-configuration: {processor-identifier}-processor-configuration
 * processor: {processor-identifier}-processor

 Optionally provide a system map key for rewind-check idempotent-check and event-handler
 
 Optional provide a map of system keys that are used as additional dependencies to the component

 e.g.

 ````
 (processors/new-system
   configuration-overrides
   {:processor-identifier    :main
    :kafka                   :kafka
    :database                :database
    :event-handler           :event-handler
    :additional-dependencies {:atom :atom}})
 ````

 Nothing is done with the event if an event-handler is not defined.

 ````
(deftype AtomEventHandler
  [atom]
  EventHandler
  (extract-payload
    [this event]
    (-> event
      (hal-json/json->resource)
      (hal/get-property :payload)
      (hal-json/json->resource)))
  (processable?
    [this processor event event-context]
    true)
  (on-event
    [this processor {:keys [topic payload]} _]
    (vent/react-to all {:channel topic :payload payload} processor))
  (on-complete
    [this processor {:keys [topic partition payload]} {:keys [event-processor]}]
    (swap! atom conj {:processor event-processor
                      :topic     topic
                      :partition partition
                      :event-id  (event-resource->id payload)})))
````
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close