As of Ziggurat version 3.13.0, all the official Kafka configs Kafka configurations for Streams API, Consumer API and Producer API are supported.
All Ziggurat configs should be in your clonfig
config.edn
under the :ziggurat
key.
- General Configurations
- Stream Router Configurations
- Batch Routes
- SSL
- StatsD
- Sentry
- RabbitMQ Connection
- RabbitMQ
- Retry
- Jobs
- HTTP Server
- New Relic
- Prometheus
{:ziggurat {:app-name "application_name"
:nrepl-server {:port [7011 :int]}
:stream-router {:stream-id {:application-id "kafka_consumer_id"
:bootstrap-servers "kafka-broker-1:6667,Kafka-broker-2:6667"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:oldest-processed-message-in-s [604800 :int]
:changelog-topic-replication-factor [3 :int]
:stream-thread-exception-response [:shutdown-client :keyword]
;;channels help you increase the number of parallel processors more than the number of partitions of your topic.
;; please see the channels section for more information.
:channels {:channel-1 {:worker-count [10 :int]
:retry {:type [:linear :keyword]
:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}
:batch-routes {:restaurants-updates-to-non-personalized-es
{:consumer-group-id "restaurants-updates-consumer"
:bootstrap-servers "g-gojek-id-mainstream.golabs.io:6668"
:origin-topic "restaurant-updates-stream"}}
:ssl {:enabled true
:ssl-keystore-location "/location/to/keystore"
:ssl-keystore-password "some-password"
{:jaas {:username "username"
:password "password"
:mechanism "SCRAM_SHA-512"}}}
:default-api-timeout-ms-config [600000 :int]
:statsd {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
:statsd {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
:sentry {:enabled [false :bool]
:dsn "dummy"
:worker-count [5 :int]
:queue-size [5 :int]
:thread-termination-wait-s [1 :int]}
:rabbit-mq-connection {:host "localhost"
:port [5672 :int]
:prefetch-count [3 :int]
:username "guest"
:password "guest"
:channel-timeout [2000 :int]}
:rabbit-mq {:delay {:queue-name "application_name_delay_queue"
:exchange-name "application_name_delay_exchange"
:dead-letter-exchange "application_name_instant_exchange"
:queue-timeout-ms [5000 :int]}
:instant {:queue-name "application_name_instant_queue"
:exchange-name "application_name_instant_exchange"}
:dead-letter {:queue-name "application_name_dead_letter_queue"
:exchange-name "application_name_dead_letter_exchange"}}
;; if retry is enabled, messages are retried in RMQ. If retry is disabled, and :retry is returned from mapper function, messages will be lost.
:retry {:count [5 :int]
:enabled [false :bool]}
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:http-server {:port [8010 :int]
:graceful-shutdown-timeout-ms [30000 :int]
:new-relic {:report-errors [false :bool]}}
:prometheus {:port 8002
:enabled [true :bool]}}}
Configuration | Data Type | Mandatory | Description |
---|
app-name | String | Yes | Refers to the name of the application. Used to namespace queues and metrics. |
nrepl-server | Integer | Yes | Port on which the REPL server will be hosted. |
default-api-timeout-ms-config | Integer | No | Specifies the timeout (in milliseconds) for client APIs. Recommended value is 600000 ms. |
Configuration | Data Type | Mandatory | Description |
---|
stream-router | Object | Yes | Configs related to all the Kafka streams the application is reading from. |
Property | Data Type | Mandatory | Description |
---|
stream-id | String | Yes | The identifier of a stream mentioned in main.clj . Each stream can read from different Kafka brokers and have different threads. |
application-id | String | Yes | The Kafka consumer group id. Documentation |
bootstrap-servers | String | Yes | The Kafka brokers that the application will read from. Accepts a comma-separated value. |
stream-threads-count | Integer | Yes | Number of parallel threads to read messages from Kafka. Can scale up to the number of partitions. |
stream-thread-exception-response | String | No | Action triggered on an uncaught exception. Possible values: :shutdown-client (default), :shutdown-application , :replace-thread . More info |
origin-topic | String | Yes | The topic that the stream should read from. Can be a regex. Messages from different streams will be passed to the same mapper-function. |
oldest-processed-messages-in-s | Integer | No | Oldest message processed by the stream in seconds. Default value is 604800 (1 week). |
changelog-topic-replication-factor | Integer | No | Internal changelog topic replication factor. Default value is 3. |
Property | Data Type | Mandatory | Description |
---|
worker-count | Integer | Yes | Number of messages to process in parallel per channel. |
retry | Object | No | Defines channel retries. |
Property | Data Type | Mandatory | Description |
---|
type | String | Yes | Type of retry (linear, exponential). |
count | Integer | Yes | Number of retries before message is sent to channel DLQ. |
enabled | Boolean | Yes | If channel retries are enabled or not. |
Property | Data Type | Mandatory | Description |
---|
bootstrap.servers | String | Yes | List of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
acks | String | Yes | Number of acknowledgments the producer requires before considering a request complete. Valid values: [all, -1, 0, 1]. |
retries | Integer | No | Number of retries for any record whose send fails with a potentially transient error. |
key.serializer | String | Yes | Serializer class for key implementing the org.apache.kafka.common.serialization.Serializer interface. |
value.serializer | String | Yes | Serializer class for value implementing the org.apache.kafka.common.serialization.Serializer interface. |
max.in.flight.requests.per.connection | Integer | No | Maximum number of unacknowledged requests the client will send on a single connection before blocking. |
enable.idempotence | Boolean | No | Ensures that exactly one copy of each message is written in the stream if set to true . |
Key | Data Type | Mandatory | Description |
---|
:batch-routes | Object | Yes | Configures batch routes for Kafka consumer API. |
:restaurants-updates-to-non-personalized-es | Object | Yes | Batch route name, customize as per application |
:consumer-group-id | String | Yes | Consumer group ID for the batch route. |
:bootstrap-servers | String | Yes | Kafka bootstrap servers for the batch route. |
:origin-topic | String | Yes | Origin topic for the batch route. |
Key | Data Type | Mandatory | Description |
---|
:ssl | Object | Yes | SSL configuration for Kafka. |
:enabled | Boolean | Yes | Flag to enable SSL. |
:ssl-keystore-location | String | Yes | Location of the SSL keystore. |
:ssl-keystore-password | String | Yes | Password for the SSL keystore. |
:jaas | Object | Yes | JAAS configuration for SASL. |
:username | String | Yes | Username for SASL authentication. |
:password | String | Yes | Password for SASL authentication. |
:mechanism | String | Yes | SASL mechanism (e.g., SCRAM-SHA-512). |
Key | Data Type | Mandatory | Description |
---|
:statsd | Object | Yes | Configuration for StatsD. |
:host | String | Yes | Host for StatsD. |
:port | Integer | Yes | Port for StatsD. |
:enabled | Boolean | Yes | Flag to enable StatsD. |
Key | Data Type | Mandatory | Description |
---|
:sentry | Object | Yes | Configuration for Sentry. |
:enabled | Boolean | Yes | Flag to enable Sentry. |
:dsn | String | Yes | Data Source Name for Sentry. |
:worker-count | Integer | Yes | Number of Sentry workers. |
:queue-size | Integer | Yes | Size of the Sentry queue. |
:thread-termination-wait-s | Integer | Yes | Wait time for thread termination in seconds. |
Key | Data Type | Mandatory | Description |
---|
:rabbit-mq-connection | Object | Yes | RabbitMQ connection configuration. |
:host | String | Yes | Host for RabbitMQ. |
:port | Integer | Yes | Port for RabbitMQ. |
:prefetch-count | Integer | No | Number of messages to prefetch. |
:username | String | Yes | Username for RabbitMQ. |
:password | String | Yes | Password for RabbitMQ. |
:channel-timeout | Integer | NO | Channel timeout in milliseconds. Default 2000 |
Key | Data Type | Mandatory | Description |
---|
:rabbit-mq | Object | Yes | Configuration for RabbitMQ queues. |
:delay | Object | Yes | Delay queue configuration. |
:queue-name | String | Yes | Name of the delay queue. |
:exchange-name | String | Yes | Name of the delay exchange. |
:dead-letter-exchange | String | Yes | Dead letter exchange for the delay queue. |
:queue-timeout-ms | Integer | Yes | Queue timeout in milliseconds. |
:instant | Object | Yes | Instant queue configuration. |
:queue-name | String | Yes | Name of the instant queue. |
:exchange-name | String | Yes | Name of the instant exchange. |
:dead-letter | Object | Yes | Dead letter queue configuration. |
:queue-name | String | Yes | Name of the dead letter queue. |
:exchange-name | String | Yes | Name of the dead letter exchange. |
Configuration | Data Type | Mandatory | Description |
---|
retry | Object | Yes | Number of times the message should be retried and if retry flow should be enabled. If retry is disabled, and :retry is returned from mapper function, messages will be lost. |
Configuration | Data Type | Mandatory | Description |
---|
jobs | Object | Yes | Number of consumers that should be reading from the retry queues and the prefetch count of each consumer. |
Configuration | Data Type | Mandatory | Description |
---|
http-server | Object | Yes | Defines the port and number of threads for the HTTP server. Also controls the graceful shutdown timeout. Default is 30000ms . |
Configuration | Data Type | Mandatory | Description |
---|
new-relic | Object | No | If report-errors is true, reports an error to New Relic whenever a :failure keyword is returned from the mapper-function or an exception is raised. Can be disabled. |
Configuration | Data Type | Mandatory | Description |
---|
prometheus | Object | No | Prometheus configuration. By default set to ON. Set the port that prometheus server runs on and enabled flag. |
enabled | bool | yes | Prometheus configuration. By default set to ON. Enables the startup of prometheus server (statsD is not used if this is enabled) |
port | int | yes | Prometheus configuration. Default 8002. Specifies the port that prometheus server runs on. |