(catch-all-exception-record-handler handler)
(catch-all-exception-record-handler handler error-consumer)
Create a CatchAllExceptionConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to catch and swallow all the exceptions thrown from the wrapped ConsumerRecordHandler instance when it failed to handle a consumed record.
This handler seems good to improve the availability of the consumer because it can swallow all the exceptions on handling a record and carry on to handle next record. But it actually can compromise the consumer to prevent a livelock, where the application did not crash but fails to make progress for some reason.
Please use it judiciously. Usually fail fast, let the polling thread exit on exception, is your best choice.
Create a CatchAllExceptionConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to catch and swallow all the exceptions thrown from the wrapped ConsumerRecordHandler instance when it failed to handle a consumed record. This handler seems good to improve the availability of the consumer because it can swallow all the exceptions on handling a record and carry on to handle next record. But it actually can compromise the consumer to prevent a livelock, where the application did not crash but fails to make progress for some reason. Please use it judiciously. Usually fail fast, let the polling thread exit on exception, is your best choice.
(create-async-commit-consumer kafka-configs msg-handler & opts)
Build a consumer in which the polling thread always does a async commit after all the polled records has been handled. Because it only commits after all the polled records handled, so the longer the records handling process, the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash.
If any async commit is failed or the number of pending async commits is beyond the limit set by LcKafkaConsumerBuilder#maxPendingAsyncCommits(int), this consumer will do a sync commit to commit all the records which have been handled.
This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:
Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.
If you set "enable.auto.commit" to true, this consumer will set it to false by itself.
Please refer to function "create-builder" to check allowed opts.
Build a consumer in which the polling thread always does a async commit after all the polled records has been handled. Because it only commits after all the polled records handled, so the longer the records handling process, the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash. If any async commit is failed or the number of pending async commits is beyond the limit set by LcKafkaConsumerBuilder#maxPendingAsyncCommits(int), this consumer will do a sync commit to commit all the records which have been handled. This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown: * max.poll.records * auto.offset.reset Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely. If you set "enable.auto.commit" to true, this consumer will set it to false by itself. Please refer to function "create-builder" to check allowed opts.
(create-auto-commit-consumer kafka-configs msg-handler & opts)
Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a
worker thread pool. But without a worker pool, please tune the max.poll.interval.ms
in
Kafka configs as mentioned in LcKafkaConsumerBuilder#workerPool(ExecutorService, boolean).
This kind of consumer requires the following kafka configs must be set, otherwise
IllegalArgumentException will be thrown:
Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.
If you set "enable.auto.commit" to true, this consumer will set it to false by itself.
Please refer to function "create-builder" to check allowed opts.
Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a worker thread pool. But without a worker pool, please tune the `max.poll.interval.ms` in Kafka configs as mentioned in LcKafkaConsumerBuilder#workerPool(ExecutorService, boolean). This kind of consumer requires the following kafka configs must be set, otherwise IllegalArgumentException will be thrown: * max.poll.interval.ms</code></li> * max.poll.records</code></li> * auto.offset.reset</code></li> * auto.commit.interval.ms</code></li> Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely. If you set "enable.auto.commit" to true, this consumer will set it to false by itself. Please refer to function "create-builder" to check allowed opts.
(create-partial-async-commit-consumer kafka-configs msg-handler & opts)
If any async commit is failed or the number of pending async commits is beyond the limit set by LcKafkaConsumerBuilder#maxPendingAsyncCommits(int), this consumer will do a sync commit to commit all the records which have been handled.
This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:
Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.
If you set "enable.auto.commit" to true, this consumer will set it to false by itself.
Please refer to function "create-builder" to check allowed opts.
* Build a consumer in which the polling thread does a async commits whenever there's any handled consumer records. It commits often, so after a consumer crash, comparatively little records may be handled more than once. It use async commit to mitigate the overhead causing by high committing times. If any async commit is failed or the number of pending async commits is beyond the limit set by LcKafkaConsumerBuilder#maxPendingAsyncCommits(int), this consumer will do a sync commit to commit all the records which have been handled. This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown: * max.poll.records * auto.offset.reset Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely. If you set "enable.auto.commit" to true, this consumer will set it to false by itself. Please refer to function "create-builder" to check allowed opts.
(create-partial-sync-commit-consumer kafka-configs msg-handler & opts)
Build a consumer in which the polling thread does a sync commits whenever there's any handled consumer records. It commits often, so after a consumer crash, comparatively little records may be handled more than once. But also due to commit often, the overhead causing by committing is relatively high.
This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:
Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.
If you set "enable.auto.commit" to true, this consumer will set it to false by itself.
Please refer to function "create-builder" to check allowed opts.
Build a consumer in which the polling thread does a sync commits whenever there's any handled consumer records. It commits often, so after a consumer crash, comparatively little records may be handled more than once. But also due to commit often, the overhead causing by committing is relatively high. This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown: * max.poll.records * auto.offset.reset Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely. If you set "enable.auto.commit" to true, this consumer will set it to false by itself. Please refer to function "create-builder" to check allowed opts.
(create-sync-commit-consumer kafka-configs msg-handler & opts)
Build a consumer in which the polling thread always does a sync commit after all the polled records has been handled.
Because it only commits after all the polled records handled, so the longer the records handling process,the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash.
This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:
Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.
If you set "enable.auto.commit" to true, this consumer will set it to false by itself.
Please refer to function "create-builder" to check allowed opts.
Build a consumer in which the polling thread always does a sync commit after all the polled records has been handled. Because it only commits after all the polled records handled, so the longer the records handling process,the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash. This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown: * max.poll.records * auto.offset.reset Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely. If you set "enable.auto.commit" to true, this consumer will set it to false by itself. Please refer to function "create-builder" to check allowed opts.
(retriable-record-handler handler max-retry-times)
(retriable-record-handler handler max-retry-times retry-interval-ms)
Create a RetriableConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to let the wrapped ConsumerRecordHandler instance try to handle a record in a limited times in case the handling process failed.
Create a RetriableConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to let the wrapped ConsumerRecordHandler instance try to handle a record in a limited times in case the handling process failed.
(subscribe-to-pattern consumer pattern)
(subscribe-to-pattern consumer pattern on-unsubscribe-callback)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions
The pattern matching will be done periodically against all topics existing at the time of check.
This can be controlled through the metadata.max.age.ms
configuration: by lowering
the max metadata age, the consumer will refresh metadata more often and check for matching topics.
Subscribe to all topics matching specified pattern to get dynamically assigned partitions The pattern matching will be done periodically against all topics existing at the time of check. This can be controlled through the `metadata.max.age.ms` configuration: by lowering the max metadata age, the consumer will refresh metadata more often and check for matching topics.
(subscribe-to-topics consumer topics)
(subscribe-to-topics consumer topics on-unsubscribe-callback)
Subscribe some Kafka topics to consume records from them.
Subscribe some Kafka topics to consume records from them.
(to-record-handler handler-fn)
Convert a single argument clojure function which used to handle the consumed Kafka record to a ConsumerRecordHandler instance.
Convert a single argument clojure function which used to handle the consumed Kafka record to a ConsumerRecordHandler instance.
(to-value-only-record-handler handler-fn)
Convert a single argument clojure function which used to handle only the value of the consumed Kafka record to a ConsumerRecordHandler instance.
Convert a single argument clojure function which used to handle only the value of the consumed Kafka record to a ConsumerRecordHandler instance.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close