Liking cljdoc? Tell your friends :D

kafka-clojure-client.consumer


catch-all-exception-record-handlerclj

(catch-all-exception-record-handler handler)
(catch-all-exception-record-handler handler error-consumer)

Create a CatchAllExceptionConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to catch and swallow all the exceptions thrown from the wrapped ConsumerRecordHandler instance when it failed to handle a consumed record.

This handler seems good to improve the availability of the consumer because it can swallow all the exceptions on handling a record and carry on to handle next record. But it actually can compromise the consumer to prevent a livelock, where the application did not crash but fails to make progress for some reason.

Please use it judiciously. Usually fail fast, let the polling thread exit on exception, is your best choice.

Create a CatchAllExceptionConsumerRecordHandler instance which is a wrapper over
another ConsumerRecordHandler instance to catch and swallow all the exceptions
thrown from the wrapped ConsumerRecordHandler instance when it failed to handle
a consumed record.

This handler seems good to improve the availability of the consumer because it can
swallow all the exceptions on handling a record and carry on to handle next record.
But it actually can compromise the consumer to prevent a livelock, where the application
did not crash but fails to make progress for some reason.

Please use it judiciously. Usually fail fast, let the polling thread exit on exception,
is your best choice.
raw docstring

closeclj

(close consumer)

Close a LcKafkaConsumer.

Close a LcKafkaConsumer.
raw docstring

create-async-commit-consumerclj

(create-async-commit-consumer kafka-configs msg-handler & opts)

Build a consumer in which the polling thread always does a async commit after all the polled records has been handled. Because it only commits after all the polled records handled, so the longer the records handling process, the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash.

If any async commit is failed or the number of pending async commits is beyond the limit set by {@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the records which have been handled.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:

  • max.poll.records
  • auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.

Build a consumer in which the polling thread always does a async commit after all the polled records has been handled.
Because it only commits after all the polled records handled, so the longer the records handling process,
the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record
when the consumer crash.

If any async commit is failed or the number of pending async commits is beyond the limit set by
{@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the
records which have been handled.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
otherwise an IllegalArgumentException will be thrown:
 * max.poll.records
 * auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically.
Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.
raw docstring

create-auto-commit-consumerclj

(create-auto-commit-consumer kafka-configs msg-handler & opts)

Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a worker thread pool. But without a worker pool, please tune the {@code max.poll.interval.ms} in Kafka configs as mentioned in {@link LcKafkaConsumerBuilder#workerPool(ExecutorService, boolean)}. This kind of consumer requires the following kafka configs must be set, otherwise IllegalArgumentException will be thrown:

  • max.poll.interval.ms</code></li>
  • max.poll.records</code></li>
  • auto.offset.reset</code></li>
  • auto.commit.interval.ms</code></li>

Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.

Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a
worker thread pool. But without a worker pool, please tune the {@code max.poll.interval.ms} in
Kafka configs as mentioned in {@link LcKafkaConsumerBuilder#workerPool(ExecutorService, boolean)}.
This kind of consumer requires the following kafka configs must be set, otherwise
IllegalArgumentException will be thrown:
 * max.poll.interval.ms</code></li>
 * max.poll.records</code></li>
 * auto.offset.reset</code></li>
 * auto.commit.interval.ms</code></li>

Though all of these configs have default values in kafka, we still require every user to set them specifically.
Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.
raw docstring

create-partial-async-commit-consumerclj

(create-partial-async-commit-consumer kafka-configs msg-handler & opts)
  • Build a consumer in which the polling thread does a async commits whenever there's any handled consumer records. It commits often, so after a consumer crash, comparatively little records may be handled more than once. It use async commit to mitigate the overhead causing by high committing times.

If any async commit is failed or the number of pending async commits is beyond the limit set by {@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the records which have been handled.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:

  • max.poll.records
  • auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.

* Build a consumer in which the polling thread does a async commits whenever there's any handled consumer records. It
commits often, so after a consumer crash, comparatively little records may be handled more than once. It use
async commit to mitigate the overhead causing by high committing times.

If any async commit is failed or the number of pending async commits is beyond the limit set by
{@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the
records which have been handled.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
otherwise an IllegalArgumentException will be thrown:
 * max.poll.records
 * auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically.
Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.
raw docstring

create-partial-sync-commit-consumerclj

(create-partial-sync-commit-consumer kafka-configs msg-handler & opts)

Build a consumer in which the polling thread does a sync commits whenever there's any handled consumer records. It commits often, so after a consumer crash, comparatively little records may be handled more than once. But also due to commit often, the overhead causing by committing is relatively high.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:

  • max.poll.records
  • auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.

Build a consumer in which the polling thread does a sync commits whenever there's any handled consumer records. It
commits often, so after a consumer crash, comparatively little records may be handled more than once. But also
due to commit often, the overhead causing by committing is relatively high.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
otherwise an IllegalArgumentException will be thrown:
 * max.poll.records
 * auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically.
Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.
raw docstring

create-sync-commit-consumerclj

(create-sync-commit-consumer kafka-configs msg-handler & opts)

Build a consumer in which the polling thread always does a sync commit after all the polled records has been handled.

Because it only commits after all the polled records handled, so the longer the records handling process,the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record when the consumer crash.

This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set, otherwise an IllegalArgumentException will be thrown:

  • max.poll.records
  • auto.offset.reset

Though all of these configs have default values in kafka, we still require every user to set them specifically. Because these configs is vital for using this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to false by itself.

Please refer to function "create-builder" to check allowed opts.

Build a consumer in which the polling thread always does a sync commit after
all the polled records has been handled.

Because it only commits after all the polled records handled, so the longer
the records handling process,the longer the interval between each commits,
the bigger of the possibility to repeatedly consume a same record when the
consumer crash.

This kind of consumer ensures to do a sync commit to commit all the finished
records at that time when the consumer is shutdown or any partition was revoked.
It requires the following kafka configs must be set, otherwise an IllegalArgumentException
will be thrown:
* max.poll.records
* auto.offset.reset

Though all of these configs have default values in kafka, we still require
every user to set them specifically. Because these configs is vital for using
this consumer safely.

If you set "enable.auto.commit" to true, this consumer will set it to
false by itself.

Please refer to function "create-builder" to check allowed opts.
raw docstring

retriable-record-handlerclj

(retriable-record-handler handler max-retry-times)
(retriable-record-handler handler max-retry-times retry-interval-ms)

Create a RetriableConsumerRecordHandler instance which is a wrapper over another ConsumerRecordHandler instance to let the wrapped ConsumerRecordHandler instance try to handle a record in a limited times in case the handling process failed.

Create a RetriableConsumerRecordHandler instance which is a wrapper over another
ConsumerRecordHandler instance to let the wrapped ConsumerRecordHandler instance
try to handle a record in a limited times in case the handling process failed.
raw docstring

subscribe-to-patternclj

(subscribe-to-pattern consumer pattern)
(subscribe-to-pattern consumer pattern on-unsubscribe-callback)

Subscribe to all topics matching specified pattern to get dynamically assigned partitions The pattern matching will be done periodically against all topics existing at the time of check. This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering the max metadata age, the consumer will refresh metadata more often and check for matching topics.

Subscribe to all topics matching specified pattern to get dynamically assigned partitions
The pattern matching will be done periodically against all topics existing at the time of check.
This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
the max metadata age, the consumer will refresh metadata more often and check for matching topics.
raw docstring

subscribe-to-topicsclj

(subscribe-to-topics consumer topics)
(subscribe-to-topics consumer topics on-unsubscribe-callback)

Subscribe some Kafka topics to consume records from them.

Subscribe some Kafka topics to consume records from them.
raw docstring

to-record-handlerclj

(to-record-handler handler-fn)

Convert a single argument clojure function which used to handle the consumed Kafka record to a ConsumerRecordHandler instance.

Convert a single argument clojure function which used to handle the consumed
Kafka record to a ConsumerRecordHandler instance.
raw docstring

to-value-only-record-handlerclj

(to-value-only-record-handler handler-fn)

Convert a single argument clojure function which used to handle only the value of the consumed Kafka record to a ConsumerRecordHandler instance.

Convert a single argument clojure function which used to handle only the value
of the consumed Kafka record to a ConsumerRecordHandler instance.
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close