Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition.
Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it.
This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"])
(use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))
(def res (do-work-unit! consumer (fn [state status resp-data] state)))
Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}
Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition. Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it. This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"]) (use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (def res (do-work-unit! consumer (fn [state status resp-data] state))) Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}
(_max-offset [saved-offset max-offset] [saved-offset2 max-offset2 :as v])
Return the max save-offsetN and max of max-offsetN [saved-offset max-offset] [saved-offset2 max-offset2]
returns [saved-offsetN max-offsetN]
Return the max save-offsetN and max of max-offsetN [saved-offset max-offset] [saved-offset2 max-offset2] returns [saved-offsetN max-offsetN]
(add-offsets! state topic min-max-kafka-offsets offset-datum)
(calculate-new-work {:keys [metadata-connector conf error-handler stats-atom]
:as state}
topics)
Accepts the state and returns the state as is. For topics new work is calculated depending on the metadata returned from the producers
stats is an atom that contains a map of adhoc stats which might be of interest to the user
Accepts the state and returns the state as is. For topics new work is calculated depending on the metadata returned from the producers stats is an atom that contains a map of adhoc stats which might be of interest to the user
(calculate-work-units producer topic partition max-offset start-offset step)
Returns '({:topic :partition :offset :len :max-offset}) Len is exclusive
Returns '({:topic :partition :offset :len :max-offset}) Len is exclusive
(check-invalid-offsets! {:keys [group-name redis-conn stats-atom] :as state}
read-ahead-offsets
offsets
&
{:keys [redis-f saved-offset-f]
:or {redis-f redis-offset-save
saved-offset-f get-saved-offset}})
Check for https://github.com/gerritjvv/kafka-fast/issues/10 if a saved-offset > max-offset the redis offset is reset optional args redis-f (fn [redis-conn group-name topic partition offset] ) used to save the offset, the default impl is used to save to redis get-saved-offset (fn [state max-kafka-offset topic partition] ) get the last saved offset, default impl is get-saved-offset
stats-atom (atom (map usefullstats))
Check for https://github.com/gerritjvv/kafka-fast/issues/10 if a saved-offset > max-offset the redis offset is reset optional args redis-f (fn [redis-conn group-name topic partition offset] ) used to save the offset, the default impl is used to save to redis get-saved-offset (fn [state max-kafka-offset topic partition] ) get the last saved offset, default impl is get-saved-offset stats-atom (atom (map usefullstats))
(check-offset-in-range state min-kafka-offset topic partition saved-offset)
Check that the saved-offset still exists on the brokers and if not get the earliest offset and start from there
Check that the saved-offset still exists on the brokers and if not get the earliest offset and start from there
(close-organiser! {:keys [metadata-connector work-complete-processor-future
redis-conn shutdown-flag shutdown-confirm]}
&
{:keys [close-redis] :or {close-redis true}})
Closes the organiser passed in
Closes the organiser passed in
(create-organiser! {:keys [bootstrap-brokers work-queue working-queue
complete-queue redis-conf redis-factory conf]
:or {redis-factory redis/create}
:as state})
Create a organiser state that should be passed to all the functions were state is required in this namespace
Create a organiser state that should be passed to all the functions were state is required in this namespace
(get-saved-offset {:keys [group-name redis-conn] :as state}
[min-kafka-offset max-kafka-offset]
topic
partition)
Returns the last saved offset for a topic partition combination If the partition is not found for what ever reason the latest offset will be taken from the kafka meta data this value is saved to redis and then returned
Returns the last saved offset for a topic partition combination If the partition is not found for what ever reason the latest offset will be taken from the kafka meta data this value is saved to redis and then returned
(redis-offset-save redis-conn group-name topic partition offset)
(send-offsets-if-any! {:keys [group-name redis-conn work-queue consume-step
work-assigned-flag conf]
:as state}
broker
topic
offset-data)
Calculates the work-units for saved-offset -> offset and persist to redis. Side effects: lpush work-units to work-queue set offsets/$topic/$partition = max-offset of work-units
Calculates the work-units for saved-offset -> offset and persist to redis. Side effects: lpush work-units to work-queue set offsets/$topic/$partition = max-offset of work-units
(start-work-complete-processor! state)
Creates a ExecutorService and starts the work-complete-loop running in a background thread Returns the ExecutorService
Creates a ExecutorService and starts the work-complete-loop running in a background thread Returns the ExecutorService
(wait-on-work-assigned-flag {:keys [work-assigned-flag]} timeout-ms)
(work-complete-handler! state {:keys [status] :as w-unit})
If the status of the w-unit is :ok the work-unit is checked for remaining work, otherwise its completed, if :fail the work-unit is sent to the work-queue. Must be run inside a redis connection e.g car/wcar redis-conn
If the status of the w-unit is :ok the work-unit is checked for remaining work, otherwise its completed, if :fail the work-unit is sent to the work-queue. Must be run inside a redis connection e.g car/wcar redis-conn
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close