Liking cljdoc? Tell your friends :D

org.purefn.river.batch

A consumer suitable for batch reading operations from Kafka.

A consumer suitable for batch reading operations from Kafka.
raw docstring

batch-consumerclj

(batch-consumer config process-fn)

Constructor, takes a config and a 2 arg process-fn.

config - ::group-id (req) - The group-id of the conumser group, used when committing offsets. ::bootstrap-servers (req) - comma separated string of kafka nodes (<host>:<port>) ::topics (req) - collection of topics the consumer will poll. ::timeout (opt) - amount of time to wait for a response from the consumer poll. ::threads (opt) - number of threads/consumers per topic. ::deserializer (opt) - string/value deserializers, defaults to nippy ::max-poll-records (opt) - max records returned on each call to poll, default 500 ::max-poll-interval-ms (opt) - max time to allow between calls to poll before rebalancing, default 300000 (5 min)

process-fn - 2 arg fn that takes a map of dependencies and a collection of record to process. Called for each batch of records returned by the consumer poll

Constructor, takes a config and a 2 arg process-fn.

config - ::group-id             (req) - The group-id of the conumser group, used when committing offsets.
         ::bootstrap-servers    (req) - comma separated string of kafka nodes (<host>:<port>)
         ::topics               (req) - collection of topics the consumer will poll.
         ::timeout              (opt) - amount of time to wait for a response from the consumer poll.
         ::threads              (opt) - number of threads/consumers per topic.
         ::deserializer         (opt) - string/value deserializers, defaults to nippy
         ::max-poll-records     (opt) - max records returned on each call to poll, default 500 
         ::max-poll-interval-ms (opt) - max time to allow between calls to poll before rebalancing, default 300000 (5 min)

process-fn - 2 arg fn that takes a map of dependencies and a collection of record to process. 
             Called for each batch of records returned by the consumer poll
sourceraw docstring

BatchConsumerclj

source

create-consumersclj

(create-consumers {:keys [:org.purefn.river.batch/topics
                          :org.purefn.river.batch/group-id
                          :org.purefn.river.batch/threads]
                   :as config})
source

default-configclj

(default-config)
(default-config config)

The default configuration for a batch consumer.

  • ::topics The topics to consumer from.

  • ::bootstrap-servers hostname:port of a broker in the Kafka cluster to sink from.

  • ::threads The number of threads (consumers) to create for each topic. (default 4)

  • ::group-id The group-id of the conumser group, used when committing offsets.

The default configuration for a batch consumer.

- `::topics` The topics to consumer from.

- `::bootstrap-servers` hostname:port of a broker in the Kafka cluster to sink from.

- `::threads` The number of threads (consumers) to create for each topic. 
(default 4)

- `::group-id` The group-id of the conumser group, used when committing offsets.
sourceraw docstring

kafka-consumerclj

(kafka-consumer {:keys [:org.purefn.river.batch/group-id
                        :org.purefn.river.batch/bootstrap-servers
                        :org.purefn.river.batch/deserializer
                        :org.purefn.river.batch/max-poll-records
                        :org.purefn.river.batch/max-poll-interval-ms]})

Given the supplied config, return a KafkaConsumer object with the appropriate settings. If unspecified, the nippy deserializer will be used by default.

Given the supplied config, return a KafkaConsumer object with the appropriate settings.
If unspecified, the nippy deserializer will be used by default.
sourceraw docstring

max-arg-countclj

(max-arg-count f)
source

processclj

(process consumer
         closing
         {:keys [:org.purefn.river.batch/timeout] :as config}
         dependencies
         process-fn)
source

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close