A consumer suitable for batch reading operations from Kafka.
A consumer suitable for batch reading operations from Kafka.
(batch-consumer config process-fn)
Constructor, takes a config and a 2 arg process-fn.
config - ::group-id (req) - The group-id of the conumser group, used when committing offsets. ::bootstrap-servers (req) - comma separated string of kafka nodes (<host>:<port>) ::topics (req) - collection of topics the consumer will poll. ::timeout (opt) - amount of time to wait for a response from the consumer poll. ::threads (opt) - number of threads/consumers per topic. ::deserializer (opt) - string/value deserializers, defaults to nippy ::max-poll-records (opt) - max records returned on each call to poll, default 500 ::max-poll-interval-ms (opt) - max time to allow between calls to poll before rebalancing, default 300000 (5 min)
process-fn - 2 arg fn that takes a map of dependencies and a collection of record to process. Called for each batch of records returned by the consumer poll
Constructor, takes a config and a 2 arg process-fn. config - ::group-id (req) - The group-id of the conumser group, used when committing offsets. ::bootstrap-servers (req) - comma separated string of kafka nodes (<host>:<port>) ::topics (req) - collection of topics the consumer will poll. ::timeout (opt) - amount of time to wait for a response from the consumer poll. ::threads (opt) - number of threads/consumers per topic. ::deserializer (opt) - string/value deserializers, defaults to nippy ::max-poll-records (opt) - max records returned on each call to poll, default 500 ::max-poll-interval-ms (opt) - max time to allow between calls to poll before rebalancing, default 300000 (5 min) process-fn - 2 arg fn that takes a map of dependencies and a collection of record to process. Called for each batch of records returned by the consumer poll
(create-consumers {:keys [:org.purefn.river.batch/topics
:org.purefn.river.batch/group-id
:org.purefn.river.batch/threads]
:as config})
(default-config)
(default-config config)
The default configuration for a batch consumer.
::topics
The topics to consumer from.
::bootstrap-servers
hostname:port of a broker in the Kafka cluster to sink from.
::threads
The number of threads (consumers) to create for each topic.
(default 4)
::group-id
The group-id of the conumser group, used when committing offsets.
The default configuration for a batch consumer. - `::topics` The topics to consumer from. - `::bootstrap-servers` hostname:port of a broker in the Kafka cluster to sink from. - `::threads` The number of threads (consumers) to create for each topic. (default 4) - `::group-id` The group-id of the conumser group, used when committing offsets.
(kafka-consumer {:keys [:org.purefn.river.batch/group-id
:org.purefn.river.batch/bootstrap-servers
:org.purefn.river.batch/deserializer
:org.purefn.river.batch/max-poll-records
:org.purefn.river.batch/max-poll-interval-ms]})
Given the supplied config, return a KafkaConsumer object with the appropriate settings. If unspecified, the nippy deserializer will be used by default.
Given the supplied config, return a KafkaConsumer object with the appropriate settings. If unspecified, the nippy deserializer will be used by default.
(process consumer
closing
{:keys [:org.purefn.river.batch/timeout] :as config}
dependencies
process-fn)
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close