Liking cljdoc? Tell your friends :D
Clojure only.

franzy.clients.consumer.client


make-consumerclj

(make-consumer config)
(make-consumer config options)
(make-consumer config key-deserializer value-deserializer)
(make-consumer config key-deserializer value-deserializer options)

Inputs: ([config :- cs/ConsumerConfig] [config :- cs/ConsumerConfig options :- (s/maybe cs/ConsumerOptions)] [config :- cs/ConsumerConfig key-deserializer :- Deserializer value-deserializer :- Deserializer] [config :- cs/ConsumerConfig key-deserializer :- Deserializer value-deserializer :- Deserializer options :- (s/maybe cs/ConsumerOptions)]) Returns: FranzConsumer

Create a Kafka Consumer from a configuration, with optional deserializers and optional consumer options. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them, otherwise expect deserializers via class name in the config map.

This consumer is a wrapper of Kafka Java Consumer API. It provides a Clojure (ish) wrapper, with Clojure data structures to/from Kafka, and implements various protocols to allow more specialized consumers following this implementation. If you prefer a lower-level implementation or wish to test your consumer, you may wish to browse this implementation and implement one or all the protocols provided.

This consumer provides implementations for both a manual and automatic consumer. You must not mix and match automatic and manual consumption. If you do violate this rule, an exception will be thrown. Generally, this means you either need to subscribe to a specific topic partition to receive an automatic assignment, or manually assign yourself.

Moreover, it is important to note that the offset position will be determined by your consumer configuration and whether or not you are saving offsets in Kafka itself, or an external location. If you need to manually reset or position the consumer offset in a particular partition, you can seek to it directly. Seeking will only work after partition assignment. For a subscription-based consumer, it is an error to seek before being assigned a partition. If you want to seek on assignmen for a subscription-based consumer, please do so using a callback to guarantee you have been assigned a valid partition.

For per function documentation, please see the source for extensive comments, usage examples, etc.

Note: This implementation stresses a reasonable compromise between raw performance, extensibility, and usability, all things considered as:

  1. A wrapper
  2. Clojure

Consumer options serve the following purposes:

  • Avoid repeated/inconvenient passing of defaults to various methods requiring options such as timeouts. Many consumers do not need per-call options.
  • Long-term extensibility as more features are added to this client, mitigating signature changes and excessive arities
  • Cheaper lookups and smaller memory footprint as the options are created in final form as records.
  • Dynamic construction of consumer options via stream processors, back-off logic, etc.
  • Reduction in garbage collection for consumers that do not need per-call options. Overall, less intermediate maps and reified objects.
  • Avoid slow memory allocations for the aforementioned cases.
  • Mitigate Kafka Java API changes. The API has often been in flux and sometimes it is necessary for extra options to handle weirdness from Java API bugs.

Note: Consumer options are distinct from the Kafka Consumer Configuration.

Inputs: ([config :- cs/ConsumerConfig] [config :- cs/ConsumerConfig options :- (s/maybe cs/ConsumerOptions)] [config :- cs/ConsumerConfig key-deserializer :- Deserializer value-deserializer :- Deserializer] [config :- cs/ConsumerConfig key-deserializer :- Deserializer value-deserializer :- Deserializer options :- (s/maybe cs/ConsumerOptions)])
Returns: FranzConsumer

Create a Kafka Consumer from a configuration, with optional deserializers and optional consumer options.
 If a callback is given, call it when stopping the consumer.
 If deserializers are provided, use them, otherwise expect deserializers via class name in the config map.

 This consumer is a wrapper of Kafka Java Consumer API.
 It provides a Clojure (ish) wrapper, with Clojure data structures to/from Kafka, and implements various protocols to
 allow more specialized consumers following this implementation.
 If you prefer a lower-level implementation or wish to test your consumer, you may wish to browse this implementation
 and implement one or all the protocols provided.

 This consumer provides implementations for both a manual and automatic consumer. You must not mix and match
 automatic and manual consumption. If you do violate this rule, an exception will be thrown. Generally, this means
 you either need to subscribe to a specific topic partition to receive an automatic assignment, or manually assign
 yourself.

 Moreover, it is important to note that the offset position will be determined by your consumer configuration and
 whether or not you are saving offsets in Kafka itself, or an external location. If you need to manually reset or position
 the consumer offset in a particular partition, you can seek to it directly. Seeking will only work after partition assignment.
 For a subscription-based consumer, it is an error to seek before being assigned a partition.
 If you want to seek on assignmen for a subscription-based consumer, please do so using a callback to guarantee you
 have been assigned a valid partition.

For per function documentation, please see the source for extensive comments, usage examples, etc.

 > Note: This implementation stresses a reasonable compromise between raw performance, extensibility, and usability, all things considered as:

 1. A wrapper
 2. Clojure

 Consumer options serve the following purposes:

 * Avoid repeated/inconvenient passing of defaults to various methods requiring options such as timeouts. Many consumers do not need per-call options.
 * Long-term extensibility as more features are added to this client, mitigating signature changes and excessive arities
 * Cheaper lookups and smaller memory footprint as the options are created in final form as records.
 * Dynamic construction of consumer options via stream processors, back-off logic, etc.
 * Reduction in garbage collection for consumers that do not need per-call options. Overall, less intermediate maps and reified objects.
 * Avoid slow memory allocations for the aforementioned cases.
 * Mitigate Kafka Java API changes. The API has often been in flux and sometimes it is necessary for extra options to handle weirdness from Java API bugs.

 > Note: Consumer options are distinct from the Kafka Consumer Configuration.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close