Liking cljdoc? Tell your friends :D

kafka-clj.consumer.work-organiser

Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"])

(use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

Kafka divides topics into partitions and each partition starts at offset 0 till N.
Standard clients take one partition to one client, but this creates coordination and scalalbility issues.
E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but
lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when
it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N.
Then adds each work unit to a redis list.
Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka.
If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit
and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster
 USAGE
(use 'kafka-clj.consumer.work-organiser :reload)
(def org (create-organiser!
{:bootstrap-brokers [{:host "localhost" :port 9092}]
 :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))
  (calculate-new-work org ["ping"])


(use 'kafka-clj.consumer.consumer :reload)
(def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

 Basic data type is the work-unit
 structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

raw docstring

_max-offsetclj

(_max-offset [saved-offset max-offset] [saved-offset2 max-offset2 :as v])

Return the max save-offsetN and max of max-offsetN [saved-offset max-offset] [saved-offset2 max-offset2]

returns [saved-offsetN max-offsetN]

Return the max save-offsetN and max of max-offsetN
[saved-offset max-offset]
[saved-offset2 max-offset2]

returns [saved-offsetN max-offsetN]
raw docstring

add-offsets!clj

(add-offsets! state topic min-max-kafka-offsets offset-datum)

calculate-new-workclj

(calculate-new-work {:keys [metadata-connector conf error-handler stats-atom]
                     :as state}
                    topics)

Accepts the state and returns the state as is. For topics new work is calculated depending on the metadata returned from the producers

stats is an atom that contains a map of adhoc stats which might be of interest to the user

Accepts the state and returns the state as is.
For topics new work is calculated depending on the metadata returned from the producers

stats is an atom that contains a map of adhoc stats which might be of interest to the user
raw docstring

calculate-work-unitsclj

(calculate-work-units producer topic partition max-offset start-offset step)

Returns '({:topic :partition :offset :len :max-offset}) Len is exclusive

Returns '({:topic :partition :offset :len :max-offset}) Len is exclusive
raw docstring

check-invalid-offsets!clj

(check-invalid-offsets! {:keys [group-name redis-conn stats-atom] :as state}
                        read-ahead-offsets
                        offsets
                        &
                        {:keys [redis-f saved-offset-f]
                         :or {redis-f redis-offset-save
                              saved-offset-f get-saved-offset}})

Check for https://github.com/gerritjvv/kafka-fast/issues/10 if a saved-offset > max-offset the redis offset is reset optional args redis-f (fn [redis-conn group-name topic partition offset] ) used to save the offset, the default impl is used to save to redis get-saved-offset (fn [state max-kafka-offset topic partition] ) get the last saved offset, default impl is get-saved-offset

stats-atom (atom (map usefullstats))

Check for https://github.com/gerritjvv/kafka-fast/issues/10
if a saved-offset > max-offset the redis offset is reset
optional args
   redis-f (fn [redis-conn group-name topic partition offset] ) used to save the offset, the default impl is used to save to redis
   get-saved-offset (fn [state max-kafka-offset topic partition] ) get the last saved offset, default impl is get-saved-offset

   stats-atom (atom (map usefullstats))
raw docstring

check-offset-in-rangeclj

(check-offset-in-range state min-kafka-offset topic partition saved-offset)

Check that the saved-offset still exists on the brokers and if not get the earliest offset and start from there

Check that the saved-offset still exists on the brokers and if not
get the earliest offset and start from there
raw docstring

close-organiser!clj

(close-organiser! {:keys [metadata-connector work-complete-processor-future
                          redis-conn shutdown-flag shutdown-confirm]}
                  &
                  {:keys [close-redis] :or {close-redis true}})

Closes the organiser passed in

Closes the organiser passed in
raw docstring

create-organiser!clj

(create-organiser! {:keys [bootstrap-brokers work-queue working-queue
                           complete-queue redis-conf redis-factory conf]
                    :or {redis-factory redis/create}
                    :as state})

Create a organiser state that should be passed to all the functions were state is required in this namespace

Create a organiser state that should be passed to all the functions were state is required in this namespace
raw docstring

get-offset-from-metaclj


get-saved-offsetclj

(get-saved-offset {:keys [group-name redis-conn] :as state}
                  [min-kafka-offset max-kafka-offset]
                  topic
                  partition)

Returns the last saved offset for a topic partition combination If the partition is not found for what ever reason the latest offset will be taken from the kafka meta data this value is saved to redis and then returned

Returns the last saved offset for a topic partition combination
If the partition is not found for what ever reason the latest offset will be taken from the kafka meta data
this value is saved to redis and then returned
raw docstring

max-valueclj

(max-value & vals)

Nil safe max function

Nil safe max function
raw docstring

redis-offset-saveclj

(redis-offset-save redis-conn group-name topic partition offset)

send-offsets-if-any!clj

(send-offsets-if-any! {:keys [group-name redis-conn work-queue consume-step
                              work-assigned-flag conf]
                       :as state}
                      broker
                      topic
                      offset-data)

Calculates the work-units for saved-offset -> offset and persist to redis. Side effects: lpush work-units to work-queue set offsets/$topic/$partition = max-offset of work-units

Calculates the work-units for saved-offset -> offset and persist to redis.
Side effects: lpush work-units to work-queue
              set offsets/$topic/$partition = max-offset of work-units
 
raw docstring

start-work-complete-processor!clj

(start-work-complete-processor! state)

Creates a ExecutorService and starts the work-complete-loop running in a background thread Returns the ExecutorService

Creates a ExecutorService and starts the work-complete-loop running in a background thread
Returns the ExecutorService
raw docstring

wait-on-work-assigned-flagclj

(wait-on-work-assigned-flag {:keys [work-assigned-flag]} timeout-ms)

work-complete-handler!clj

(work-complete-handler! state {:keys [status] :as w-unit})

If the status of the w-unit is :ok the work-unit is checked for remaining work, otherwise its completed, if :fail the work-unit is sent to the work-queue. Must be run inside a redis connection e.g car/wcar redis-conn

If the status of the w-unit is :ok the work-unit is checked for remaining work, otherwise its completed, if :fail the work-unit is sent to the work-queue.
Must be run inside a redis connection e.g car/wcar redis-conn
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close