(create config)
Create a consumer, returning a map that contains 2 channels:
:out-ch
- Output channel used to receive all of the data from kafka as well as controll and error
events that occur. The publishing of these events will depend on your output-policy
:ctl-ch
- Input channel used to issue control commands to the consumer.
Configuration options:
:output-policy
: A set containing the output sources that should be returned via the out-ch
.
Valid options include:
`:control`: Output from control operations
`:error`: Any exceptions or errors that occur
`:data`: Data retrieved from the kafka consumer. The `data` setting will always be
included in the `output-policy`, else none of the data will be accessible
The Kafka Java interface returns each of these as a Java object. Gregor converts each
to pure data (i.e. a map).
:topics
: Either a vector of topics, represented as strings or keywords or a single topic represented as a
string or keyword or a regex to match a set of topics.
:timeout
: time to wait, in milliseconds, for queued message retrieval when closing the consumer.
:key-deserializer
: Serializer to use for key serialization, default is :edn
.
Valid serializers are :edn
, :string
, :json
and :keyword
:value-deserializer
: Serializer to use for value serialization, default is :edn
.
Valid serializers are :edn
, :string
and :json
:output-buffer
: Buffer size of :out-ch
, default is 100
:kafka-configuration
: Map containing kafka producer configuration settings. This map will be converted
into key/value properties and passed directly to the KafkaProducer object.
:transducer
: Transformation function to be applied to all data placed on out-ch
channel.
Example Configuration:
{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000 :group.id "gregor.test"} :output-policy #{:control :error} :key-deserializer :string :value-deserializer :json :output-buffer 50 :timeout 100}
All key value pairs in :kafka-configuration
will be converted and inserted into a property map and
passed into the Kafka Java client as configuration options.
Create a consumer, returning a map that contains 2 channels: `:out-ch` - Output channel used to receive all of the data from kafka as well as controll and error events that occur. The publishing of these events will depend on your `output-policy` `:ctl-ch` - Input channel used to issue control commands to the consumer. Configuration options: `:output-policy`: A set containing the output sources that should be returned via the `out-ch`. Valid options include: `:control`: Output from control operations `:error`: Any exceptions or errors that occur `:data`: Data retrieved from the kafka consumer. The `data` setting will always be included in the `output-policy`, else none of the data will be accessible The Kafka Java interface returns each of these as a Java object. Gregor converts each to pure data (i.e. a map). `:topics`: Either a vector of topics, represented as strings or keywords or a single topic represented as a string or keyword or a regex to match a set of topics. `:timeout`: time to wait, in milliseconds, for queued message retrieval when closing the consumer. `:key-deserializer`: Serializer to use for key serialization, default is `:edn`. Valid serializers are `:edn`, `:string`, `:json` and `:keyword` `:value-deserializer`: Serializer to use for value serialization, default is `:edn`. Valid serializers are `:edn`, `:string` and `:json` `:output-buffer`: Buffer size of `:out-ch`, default is 100 `:kafka-configuration`: Map containing kafka producer configuration settings. This map will be converted into key/value properties and passed directly to the KafkaProducer object. `:transducer`: Transformation function to be applied to all data placed on `out-ch` channel. Example Configuration: `{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000 :group.id "gregor.test"} :output-policy #{:control :error} :key-deserializer :string :value-deserializer :json :output-buffer 50 :timeout 100}` All key value pairs in `:kafka-configuration` will be converted and inserted into a property map and passed into the Kafka Java client as configuration options.
(create-context {:keys [output-buffer timeout output-policy transducer]
:or {output-buffer default-output-buffer
timeout default-timeout
output-policy #{}}
:as config})
Builds context map containing all the necessary channels and channel connections
Builds context map containing all the necessary channels and channel connections
(handle-control-event {:keys [op topic-offsets topic topics] :as command}
{:keys [driver timeout ctl-ch out-ch output-policy]})
Handle a control event after it has been read from the channel
Handle a control event after it has been read from the channel
(processing-loop {:keys [out-ch ctl-ch ctl-mult ctl-handler-ch ctl-ready-ch
output-policy]
:as context})
Creates loop for processing messages from Kafka driver
v
Creates loop for processing messages from Kafka `driver`v
(safe-poll {:keys [driver timeout output-policy]})
Polls for data arriving from Kafka, catching appropriate exceptions to dispatch control events
Polls for data arriving from Kafka, catching appropriate exceptions to dispatch control events
(subscribed? driver)
Returns true
if driver
is subscribed to the specified topic
Returns `true` if `driver` is subscribed to the specified `topic`
(wakeup-loop {:keys [driver ctl-mult ctl-wakeup-ch ctl-ready-ch]})
Listens on tapped channel for control commands, waking up the Kafka client when a message is received
Listens on tapped channel for control commands, waking up the Kafka client when a message is received
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close