To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of send methods defined in org.apache.kafka.clients.producer.KafkaProducer.
At the time of initialization, an instance of org.apache.kafka.clients.producer.KafkaProducer is constructed using config values provided in resources/config.edn. A producer can be configured for each of the stream-routes in config.edn. Please see the example below.
At present, only a few configurations are supported for constructing KafkaProducer. These have been explained here. Please see Producer configs for a complete list of all producer configs available in Kafka.
Ziggurat.producer namespace defines a multi-arity send function which is a thin wrapper around KafkaProducer#send. This method publishes data to a Kafka topic through a Kafka producer
defined in the stream router configuration. See configuration section below.
E.g.
For publishing data using a producer which is defined for the stream router config with key :default, use send like this:
(send :default "test-topic" "key" "value")
(send :default "test-topic" 1 "key" "value")
With Ziggurat version 3.5.1, both Kafka Streams API and Kafka Consumer API can be used to consume the messages in real time. Kafka Consumer API is an efficient way to consume messages from high throughput Kafka topics.
With Kafka Streams API, one message is processed at a time. But, with Kafka Consumer API integration in Ziggurat, a user can consume messages in bulk and can control how many messages it wants to consume at a time. This batch size can be configured using max-poll-records config https://docs.confluent.io/current/installation/configuration/consumer-configs.html#max.poll.records.
Like Streams, Ziggurat also provides the facility to specify multiple batch routes.
:batch-routes {:restaurants-updates-to-non-personalized-es
{:consumer-group-id "restaurants-updates-consumer"
:bootstrap-servers "g-gojek-id-mainstream.golabs.io:6668"
:origin-topic "restaurant-updates-stream"}}
A full list of supported configs is given below. These configs can be added to config.edn as per the requirements.
(defn -main [& args]
(init/main {:start-fn start
:stop-fn stop
:stream-routes {:booking {:handler-fn (stream-deserializer/protobuf->hash
stream-handler
BookingLogMessage
:booking)}}
:batch-routes {:batch-consumer-1 {:handler-fn (batch-deserialzer/deserialize-batch-of-proto-messages
batch-handler
BookingLogKey
BookingLogMessage
:batch-consumer-1)}}
:actor-routes [["v1/hello" {:get get-hello}]]}))
(defn- single-message-handler
[message]
(log/info "Batch Message: " message))
(defn batch-handler
[batch]
(log/infof "Received a batch of %d messages" (count batch))
(doseq [single-message batch]
(single-message-handler single-message))
(if (retry?)
(do (log/info "Retrying the batch..")
{:retry batch :skip []})
{:retry [] :skip []}))
| Ziggurat Config | Default Value | Description | Mandatory? |
|---|---|---|---|
| :bootstrap-servers | NA | https://kafka.apache.org/documentation/#bootstrap.servers | Yes |
| :consumer-group-id | NA | https://kafka.apache.org/documentation/#group.id | Yes |
| :origin-topic | NA | Kafka Topic to read data from | Yes |
| :max-poll-records | 500 | https://kafka.apache.org/documentation/#max.poll.records | No |
| :session-timeout-ms-config | 60000 | https://kafka.apache.org/documentation/#session.timeout.ms | No |
| :key-deserializer-class-config | "org.apache.kafka.common.serialization.ByteArrayDeserializer" | https://kafka.apache.org/documentation/#key.deserializer | No |
| :value-deserializer-class-config | "org.apache.kafka.common.serialization.ByteArrayDeserializer" | https://kafka.apache.org/documentation/#value.deserializer | No |
| :poll-timeout-ms-config | 1000 | Timeout value used for polling with a Kafka Consumer | No |
| :thread-count | 2 | Number of Kafka Consumer threads for each batch-route | No |
| :default-api-timeout-ms | 60000 | https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior | No |
| :manual-commit-enabled | false | When true, Kafka's background auto-commit (enable.auto.commit) is disabled and offsets are committed (commitSync) only after a batch has been processed and any failures enqueued for retry. This guarantees at-least-once delivery and prevents message loss when a consumer dies mid-batch. When false (default), the existing auto-commit behaviour is preserved. | No |
By default (:manual-commit-enabled false) batch consumers rely on Kafka's auto-commit
(enable.auto.commit=true), which commits offsets on a timer independently of batch
processing. If a consumer dies after offsets are auto-committed but before a batch finishes
processing, those messages can be lost.
Setting :manual-commit-enabled true on a batch route turns off auto-commit and makes
Ziggurat commit offsets with commitSync only after the batch handler has run and any
messages needing retry have been enqueued to RabbitMQ. A failed commit is logged and reported
via the ziggurat.batch.consumption.offset.commit metric but does not halt the poll loop —
the offsets are committed on the next successful commit, preserving at-least-once delivery.
Can you improve this documentation? These fine people already did:
riya.bhardwaj & vruttantmankadEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |