Liking cljdoc? Tell your friends :D

Configuration

As of Ziggurat version 3.13.0, all the official Kafka configs Kafka configurations for Streams API, Consumer API and Producer API are supported.

All Ziggurat configs should be in your clonfig config.edn under the :ziggurat key.

Table of Contents

  1. General Configurations
  2. Stream Router Configurations
  3. Batch Routes
  4. SSL
  5. StatsD
  6. Sentry
  7. RabbitMQ Connection
  8. RabbitMQ
  9. Retry
  10. Jobs
  11. HTTP Server
  12. New Relic
  13. Prometheus
{:ziggurat  {:app-name            "application_name"
            :nrepl-server         {:port [7011 :int]}
            :stream-router        {:stream-id            {:application-id                 "kafka_consumer_id"
                                                          :bootstrap-servers              "kafka-broker-1:6667,Kafka-broker-2:6667"
                                                          :stream-threads-count           [1 :int]
                                                          :origin-topic                   "kafka-topic-*"
                                                          :oldest-processed-message-in-s  [604800 :int]
                                                          :changelog-topic-replication-factor [3 :int]
                                                          :stream-thread-exception-response [:shutdown-client :keyword]
                                                          ;;channels help you increase the number of parallel processors more than the number of partitions of your topic.
                                                          ;; please see the channels section for more information.
                                                          :channels                           {:channel-1 {:worker-count [10 :int]
                                                                                                         :retry        {:type    [:linear :keyword]
                                                                                                                        :count   [5 :int]
                                                                                                                        :enabled [true :bool]}}}
                                                          :producer   {:bootstrap-servers                     "localhost:9092"
                                                                       :acks                                  "all"
                                                                       :retries-config                        5
                                                                       :max-in-flight-requests-per-connection 5
                                                                       :enable-idempotence                    false
                                                                       :value-serializer                      "org.apache.kafka.common.serialization.StringSerializer"
                                                                       :key-serializer                        "org.apache.kafka.common.serialization.StringSerializer"}}} 
            :batch-routes         {:restaurants-updates-to-non-personalized-es
                                    {:consumer-group-id          "restaurants-updates-consumer"
                                     :bootstrap-servers          "g-gojek-id-mainstream.golabs.io:6668"
                                     :origin-topic               "restaurant-updates-stream"}}
            :ssl                  {:enabled true
                                   :ssl-keystore-location "/location/to/keystore"
                                   :ssl-keystore-password "some-password"
                                   {:jaas {:username "username"
                                           :password "password"
                                           :mechanism "SCRAM_SHA-512"}}}
            :default-api-timeout-ms-config [600000 :int]
            :statsd               {:host    "localhost"
                                   :port    [8125 :int]
                                   :enabled [false :bool]}
            :statsd               {:host    "localhost"
                                   :port    [8125 :int]
                                   :enabled [false :bool]}
            :sentry               {:enabled                   [false :bool]
                                   :dsn                       "dummy"
                                   :worker-count              [5 :int]
                                   :queue-size                [5 :int]
                                   :thread-termination-wait-s [1 :int]}
            :rabbit-mq-connection {:host            "localhost"
                                   :port            [5672 :int]
                                   :prefetch-count  [3 :int]
                                   :username        "guest"
                                   :password        "guest"
                                   :channel-timeout [2000 :int]}
            :rabbit-mq            {:delay       {:queue-name           "application_name_delay_queue"
                                                 :exchange-name        "application_name_delay_exchange"
                                                 :dead-letter-exchange "application_name_instant_exchange"
                                                 :queue-timeout-ms     [5000 :int]}
                                   :instant     {:queue-name    "application_name_instant_queue"
                                                 :exchange-name "application_name_instant_exchange"}
                                   :dead-letter {:queue-name    "application_name_dead_letter_queue"
                                                 :exchange-name "application_name_dead_letter_exchange"}}
            ;; if retry is enabled, messages are retried in RMQ.  If retry is disabled, and :retry is returned from mapper function, messages will be lost.
             :retry                {:count   [5 :int]
                                   :enabled [false :bool]}
            :jobs                 {:instant {:worker-count   [4 :int]
                                             :prefetch-count [4 :int]}}
            :http-server          {:port         [8010 :int]
                                   :graceful-shutdown-timeout-ms [30000 :int]
            :new-relic            {:report-errors [false :bool]}}
            :prometheus           {:port 8002
                                   :enabled [true :bool]}}}

General Configurations

ConfigurationData TypeMandatoryDescription
app-nameStringYesRefers to the name of the application. Used to namespace queues and metrics.
nrepl-serverIntegerYesPort on which the REPL server will be hosted.
default-api-timeout-ms-configIntegerNoSpecifies the timeout (in milliseconds) for client APIs. Recommended value is 600000 ms.

Stream Router Configurations

ConfigurationData TypeMandatoryDescription
stream-routerObjectYesConfigs related to all the Kafka streams the application is reading from.

Stream Router Properties

PropertyData TypeMandatoryDescription
stream-idStringYesThe identifier of a stream mentioned in main.clj. Each stream can read from different Kafka brokers and have different threads.
application-idStringYesThe Kafka consumer group id. Documentation
bootstrap-serversStringYesThe Kafka brokers that the application will read from. Accepts a comma-separated value.
stream-threads-countIntegerYesNumber of parallel threads to read messages from Kafka. Can scale up to the number of partitions.
stream-thread-exception-responseStringNoAction triggered on an uncaught exception. Possible values: :shutdown-client (default), :shutdown-application, :replace-thread. More info
origin-topicStringYesThe topic that the stream should read from. Can be a regex. Messages from different streams will be passed to the same mapper-function.
oldest-processed-messages-in-sIntegerNoOldest message processed by the stream in seconds. Default value is 604800 (1 week).
changelog-topic-replication-factorIntegerNoInternal changelog topic replication factor. Default value is 3.

Channels

PropertyData TypeMandatoryDescription
worker-countIntegerYesNumber of messages to process in parallel per channel.
retryObjectNoDefines channel retries.

Retry Properties

PropertyData TypeMandatoryDescription
typeStringYesType of retry (linear, exponential).
countIntegerYesNumber of retries before message is sent to channel DLQ.
enabledBooleanYesIf channel retries are enabled or not.

Producer

PropertyData TypeMandatoryDescription
bootstrap.serversStringYesList of host/port pairs to use for establishing the initial connection to the Kafka cluster.
acksStringYesNumber of acknowledgments the producer requires before considering a request complete. Valid values: [all, -1, 0, 1].
retriesIntegerNoNumber of retries for any record whose send fails with a potentially transient error.
key.serializerStringYesSerializer class for key implementing the org.apache.kafka.common.serialization.Serializer interface.
value.serializerStringYesSerializer class for value implementing the org.apache.kafka.common.serialization.Serializer interface.
max.in.flight.requests.per.connectionIntegerNoMaximum number of unacknowledged requests the client will send on a single connection before blocking.
enable.idempotenceBooleanNoEnsures that exactly one copy of each message is written in the stream if set to true.

Batch Routes

KeyData TypeMandatoryDescription
:batch-routesObjectYesConfigures batch routes for Kafka consumer API.
:restaurants-updates-to-non-personalized-esObjectYesBatch route name, customize as per application
:consumer-group-idStringYesConsumer group ID for the batch route.
:bootstrap-serversStringYesKafka bootstrap servers for the batch route.
:origin-topicStringYesOrigin topic for the batch route.

SSL

KeyData TypeMandatoryDescription
:sslObjectYesSSL configuration for Kafka.
:enabledBooleanYesFlag to enable SSL.
:ssl-keystore-locationStringYesLocation of the SSL keystore.
:ssl-keystore-passwordStringYesPassword for the SSL keystore.
:jaasObjectYesJAAS configuration for SASL.
:usernameStringYesUsername for SASL authentication.
:passwordStringYesPassword for SASL authentication.
:mechanismStringYesSASL mechanism (e.g., SCRAM-SHA-512).

StatsD

KeyData TypeMandatoryDescription
:statsdObjectYesConfiguration for StatsD.
:hostStringYesHost for StatsD.
:portIntegerYesPort for StatsD.
:enabledBooleanYesFlag to enable StatsD.

Sentry

KeyData TypeMandatoryDescription
:sentryObjectYesConfiguration for Sentry.
:enabledBooleanYesFlag to enable Sentry.
:dsnStringYesData Source Name for Sentry.
:worker-countIntegerYesNumber of Sentry workers.
:queue-sizeIntegerYesSize of the Sentry queue.
:thread-termination-wait-sIntegerYesWait time for thread termination in seconds.

RabbitMQ Connection

KeyData TypeMandatoryDescription
:rabbit-mq-connectionObjectYesRabbitMQ connection configuration.
:hostStringYesHost for RabbitMQ.
:portIntegerYesPort for RabbitMQ.
:prefetch-countIntegerNoNumber of messages to prefetch.
:usernameStringYesUsername for RabbitMQ.
:passwordStringYesPassword for RabbitMQ.
:channel-timeoutIntegerNOChannel timeout in milliseconds.
Default 2000

RabbitMQ

KeyData TypeMandatoryDescription
:rabbit-mqObjectYesConfiguration for RabbitMQ queues.
:delayObjectYesDelay queue configuration.
:queue-nameStringYesName of the delay queue.
:exchange-nameStringYesName of the delay exchange.
:dead-letter-exchangeStringYesDead letter exchange for the delay queue.
:queue-timeout-msIntegerYesQueue timeout in milliseconds.
:instantObjectYesInstant queue configuration.
:queue-nameStringYesName of the instant queue.
:exchange-nameStringYesName of the instant exchange.
:dead-letterObjectYesDead letter queue configuration.
:queue-nameStringYesName of the dead letter queue.
:exchange-nameStringYesName of the dead letter exchange.

Retry

ConfigurationData TypeMandatoryDescription
retryObjectYesNumber of times the message should be retried and if retry flow should be enabled. If retry is disabled, and :retry is returned from mapper function, messages will be lost.

Jobs

ConfigurationData TypeMandatoryDescription
jobsObjectYesNumber of consumers that should be reading from the retry queues and the prefetch count of each consumer.

HTTP Server

ConfigurationData TypeMandatoryDescription
http-serverObjectYesDefines the port and number of threads for the HTTP server. Also controls the graceful shutdown timeout. Default is 30000ms.

New Relic

ConfigurationData TypeMandatoryDescription
new-relicObjectNoIf report-errors is true, reports an error to New Relic whenever a :failure keyword is returned from the mapper-function or an exception is raised. Can be disabled.

Prometheus

ConfigurationData TypeMandatoryDescription
prometheusObjectNoPrometheus configuration. By default set to ON. Set the port that prometheus server runs on and enabled flag.
enabledboolyesPrometheus configuration. By default set to ON. Enables the startup of prometheus server (statsD is not used if this is enabled)
portintyesPrometheus configuration. Default 8002. Specifies the port that prometheus server runs on.

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close