Liking cljdoc? Tell your friends :D

gregor.consumer


close-tapclj

(close-tap ch-mult ch)
source

createclj

(create config)

Create a consumer, returning a map that contains 2 channels:

:out-ch - Output channel used to receive all of the data from kafka as well as controll and error events that occur. The publishing of these events will depend on your output-policy

:ctl-ch - Input channel used to issue control commands to the consumer.

Configuration options:

:output-policy: A set containing the output sources that should be returned via the out-ch. Valid options include:

              `:control`: Output from control operations
              `:error`: Any exceptions or errors that occur
              `:data`: Data retrieved from the kafka consumer.  The `data` setting will always be
                       included in the `output-policy`, else none of the data will be accessible
              
              The Kafka Java interface returns each of these as a Java object. Gregor converts each
              to pure data (i.e. a map).

:topics: Either a vector of topics, represented as strings or keywords or a single topic represented as a string or keyword or a regex to match a set of topics.

:timeout: time to wait, in milliseconds, for queued message retrieval when closing the consumer.

:key-deserializer: Serializer to use for key serialization, default is :edn. Valid serializers are :edn, :string, :json and :keyword

:value-deserializer: Serializer to use for value serialization, default is :edn. Valid serializers are :edn, :string and :json

:output-buffer: Buffer size of :out-ch, default is 100

:kafka-configuration: Map containing kafka producer configuration settings. This map will be converted into key/value properties and passed directly to the KafkaProducer object.

:transducer: Transformation function to be applied to all data placed on out-ch channel.

Example Configuration:

{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000 :group.id "gregor.test"} :output-policy #{:control :error} :key-deserializer :string :value-deserializer :json :output-buffer 50 :timeout 100}

All key value pairs in :kafka-configuration will be converted and inserted into a property map and passed into the Kafka Java client as configuration options.

Create a consumer, returning a map that contains 2 channels:

`:out-ch` - Output channel used to receive all of the data from kafka as well as controll and error
            events that occur.  The publishing of these events will depend on your `output-policy`

`:ctl-ch` - Input channel used to issue control commands to the consumer.

Configuration options:

`:output-policy`: A set containing the output sources that should be returned via the `out-ch`.
                  Valid options include:

                  `:control`: Output from control operations
                  `:error`: Any exceptions or errors that occur
                  `:data`: Data retrieved from the kafka consumer.  The `data` setting will always be
                           included in the `output-policy`, else none of the data will be accessible
                  
                  The Kafka Java interface returns each of these as a Java object. Gregor converts each
                  to pure data (i.e. a map).
`:topics`: Either a vector of topics, represented as strings or keywords or a single topic represented as a
           string or keyword or a regex to match a set of topics.
 
`:timeout`: time to wait, in milliseconds, for queued message retrieval when closing the consumer.

`:key-deserializer`: Serializer to use for key serialization, default is `:edn`.
                     Valid serializers are `:edn`, `:string`, `:json` and `:keyword`

`:value-deserializer`: Serializer to use for value serialization, default is `:edn`.
                       Valid serializers are `:edn`, `:string` and `:json`

`:output-buffer`: Buffer size of `:out-ch`, default is 100

`:kafka-configuration`: Map containing kafka producer configuration settings.  This map will be converted
                        into key/value properties and passed directly to the KafkaProducer object.

`:transducer`: Transformation function to be applied to all data placed on `out-ch` channel.

Example Configuration:

`{:kafka-configuration {:bootstrap.servers "localhost:9092"
                        :max.poll.recordss 1000
                        :group.id "gregor.test"}
  :output-policy #{:control :error}
  :key-deserializer :string
  :value-deserializer :json
  :output-buffer 50
  :timeout 100}`

All key value pairs in `:kafka-configuration` will be converted and inserted into a property map and
passed into the Kafka Java client as configuration options.
sourceraw docstring

create-contextclj

(create-context {:keys [output-buffer timeout output-policy transducer]
                 :or {output-buffer default-output-buffer
                      timeout default-timeout
                      output-policy #{}}
                 :as config})

Builds context map containing all the necessary channels and channel connections

Builds context map containing all the necessary channels and channel connections
sourceraw docstring

handle-control-eventclj

(handle-control-event {:keys [op topic-offsets topic topics] :as command}
                      {:keys [driver timeout ctl-ch out-ch output-policy]})

Handle a control event after it has been read from the channel

Handle a control event after it has been read from the channel
sourceraw docstring

processing-loopclj

(processing-loop {:keys [out-ch ctl-ch ctl-mult ctl-handler-ch ctl-ready-ch
                         driver timeout output-policy]
                  :as context})

Creates loop for processing messages from Kafka driverv

Creates loop for processing messages from Kafka `driver`v
sourceraw docstring

safe-pollclj

(safe-poll {:keys [driver timeout output-policy]})

Polls for data arriving from Kafka, catching appropriate exceptions to dispatch control events

Polls for data arriving from Kafka, catching appropriate exceptions to dispatch control events
sourceraw docstring

subscribed?clj

(subscribed? driver)

Returns true if driver is subscribed to the specified topic

Returns `true` if `driver` is subscribed to the specified `topic`
sourceraw docstring

wakeup-loopclj

(wakeup-loop {:keys [driver ctl-mult ctl-wakeup-ch ctl-ready-ch]})

Listens on tapped channel for control commands, waking up the Kafka client when a message is received

Listens on tapped channel for control commands, waking up the Kafka client when a message is received
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close