Liking cljdoc? Tell your friends :D

kafka-clj.consumer.work-organiser

Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"])

(use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

Kafka divides topics into partitions and each partition starts at offset 0 till N.
Standard clients take one partition to one client, but this creates coordination and scalalbility issues.
E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but
lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when
it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N.
Then adds each work unit to a redis list.
Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka.
If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit
and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster
 USAGE
(use 'kafka-clj.consumer.work-organiser :reload)
(def org (create-organiser!
{:bootstrap-brokers [{:host "localhost" :port 9092}]
 :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))
  (calculate-new-work org ["ping"])


(use 'kafka-clj.consumer.consumer :reload)
(def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

 Basic data type is the work-unit
 structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

raw docstring

kafka-clj.consumer.workunits

Internal consumer helper api for receiving and publishing work units to redis The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!

Internal consumer helper api for receiving and publishing work units to redis
The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close