To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of send
methods defined in org.apache.kafka.clients.producer.KafkaProducer
.
At the time of initialization, an instance of org.apache.kafka.clients.producer.KafkaProducer
is constructed using config values provided in resources/config.edn
. A producer can be configured for each of the stream-routes in config.edn. Please see the example below.
At present, only a few configurations are supported for constructing KafkaProducer. These have been explained here. Please see Producer configs for a complete list of all producer configs available in Kafka.
Ziggurat.producer namespace defines a multi-arity send
function which is a thin wrapper around KafkaProducer#send. This method publishes data to a Kafka topic through a Kafka producer
defined in the stream router configuration. See configuration section below.
E.g.
For publishing data using a producer which is defined for the stream router config with key :default
, use send like this:
(send :default "test-topic" "key" "value")
(send :default "test-topic" 1 "key" "value")
With Ziggurat version 3.5.1, both Kafka Streams API and Kafka Consumer API can be used to consume the messages in real time. Kafka Consumer API is an efficient way to consume messages from high throughput Kafka topics.
With Kafka Streams API, one message is processed at a time. But, with Kafka Consumer API integration in Ziggurat, a user can consume messages in bulk and can control how many messages it wants to consume at a time. This batch size can be configured using max-poll-records config https://docs.confluent.io/current/installation/configuration/consumer-configs.html#max.poll.records.
Like Streams, Ziggurat also provides the facility to specify multiple batch routes.
:batch-routes {:restaurants-updates-to-non-personalized-es
{:consumer-group-id "restaurants-updates-consumer"
:bootstrap-servers "g-gojek-id-mainstream.golabs.io:6668"
:origin-topic "restaurant-updates-stream"}}
A full list of supported configs is given below. These configs can be added to config.edn
as per the requirements.
(defn -main [& args]
(init/main {:start-fn start
:stop-fn stop
:stream-routes {:booking {:handler-fn (stream-deserializer/protobuf->hash
stream-handler
BookingLogMessage
:booking)}}
:batch-routes {:batch-consumer-1 {:handler-fn (batch-deserialzer/deserialize-batch-of-proto-messages
batch-handler
BookingLogKey
BookingLogMessage
:batch-consumer-1)}}
:actor-routes [["v1/hello" {:get get-hello}]]}))
(defn- single-message-handler
[message]
(log/info "Batch Message: " message))
(defn batch-handler
[batch]
(log/infof "Received a batch of %d messages" (count batch))
(doseq [single-message batch]
(single-message-handler single-message))
(if (retry?)
(do (log/info "Retrying the batch..")
{:retry batch :skip []})
{:retry [] :skip []}))
Ziggurat Config | Default Value | Description | Mandatory? |
---|---|---|---|
:bootstrap-servers | NA | https://kafka.apache.org/documentation/#bootstrap.servers | Yes |
:consumer-group-id | NA | https://kafka.apache.org/documentation/#group.id | Yes |
:origin-topic | NA | Kafka Topic to read data from | Yes |
:max-poll-records | 500 | https://kafka.apache.org/documentation/#max.poll.records | No |
:session-timeout-ms-config | 60000 | https://kafka.apache.org/documentation/#session.timeout.ms | No |
:key-deserializer-class-config | "org.apache.kafka.common.serialization.ByteArrayDeserializer" | https://kafka.apache.org/documentation/#key.deserializer | No |
:value-deserializer-class-config | "org.apache.kafka.common.serialization.ByteArrayDeserializer" | https://kafka.apache.org/documentation/#value.deserializer | No |
:poll-timeout-ms-config | 1000 | Timeout value used for polling with a Kafka Consumer | No |
:thread-count | 2 | Number of Kafka Consumer threads for each batch-route | No |
:default-api-timeout-ms | 60000 | https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior | No |
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close