Liking cljdoc? Tell your friends :D


Internal consumer code, for consumer public api see kafka-clj.node

Internal consumer code, for consumer public api see kafka-clj.node
raw docstring


(auto-tune-fetch max-bytes-at state metadata-connector delegate-f wu)

Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.

Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.
raw docstring


(close-consumer! {:keys [publish-exec-service exec-service metadata-connector


(consume! {:keys [conf msg-ch work-unit-event-ch metadata-connector] :as state})

Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full The actual consume! function returns inmediately

reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout

:pool-limit 20 ; number of tcp pool connections to create

:jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see

Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units
from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is
sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full
The actual consume! function returns inmediately

 reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout

 :pool-limit 20 ; number of tcp pool connections to create

 :jaas if set the jaas authentication will be used with each tcp connection
       this value should point to the jaas config file.
       for more information see
raw docstring


(consumer-pool-stats {:keys [exec-service] :as conn})

Return a stats map for instances returned from the consume! function

Return a stats map for instances returned from the consume! function
raw docstring



(fetch-state delegate-f {:keys [topic partition max-offset] :as m})

Creates a mutable initial state

Creates a mutable initial state
raw docstring


Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response

Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response


(-msg-event msg state)

Must return FetchState status can be :ok or :error

Must return FetchState status can be :ok or :error
raw docstring


(increase-dec v update-ts)

Increase the decrementing value by a quarter of its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB

Increase the decrementing value by a quarter of its size up to two megs,
if the value is already TWO-MEGS long its reset to 1KB
raw docstring


(increase-inc v update-ts)

Increase the incrementing value by half its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB

Increase the incrementing value by half its size up to two megs,
if the value is already TWO-MEGS long its reset to 1KB
raw docstring


(increase-val v decayer MAX RESET-VAL updates-ts)

increase the value v by (/ v decayer) up to a max of two megs, if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value

increase the value v by (/ v decayer) up to a max of two megs,
if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value
raw docstring


(mark! meter)

Mark messages read meter

Mark messages read meter
raw docstring


(mark-max-bytes! topic maxbts)


(mark-min-bytes! topic minbts)






(over-seconds-ago? time-ms seconds)


(process-wu! state metadata-connector delegate-f wu)

Borrow a connection Write a fetch request Process the fetch request sending all messages to delegate-f Publish the wu as consumed to redis, or on error as error

 Borrow a connection
Write a fetch request
Process the fetch request sending all messages to delegate-f
Publish the wu as consumed to redis, or on error as error
raw docstring


(read-process-resp! delegate-f wu bts)

Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg Returns [status offset discarded min max] Throws: Exception, may block

Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg
Returns [status offset discarded min max]
Throws: Exception, may block
raw docstring


(show-work-unit-thread-stats {:keys [work-unit-thread-stats]})

public function Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}

public function
Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}
raw docstring



Start the metrics reporting, writing to STDOUT every 10 seconds

Start the metrics reporting, writing to STDOUT every 10 seconds
raw docstring




(update-dec-rem [max-bts rem-inc rem-dec updated-ts])

Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor

Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor
raw docstring


(update-inc-rem [max-bts rem-inc rem-dec updated-ts])

Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor

Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor
raw docstring


(update-state-max-bytes state max-bytes)


(update-work-unit-thread-stats! work-unit-thread-stats start-ts end-ts wu)

Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}

Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}
raw docstring


(with-default f default-val)

Apply the function to its argument only if the argument is not nil, otherwise default-val is returned

Apply the function to its argument only if the argument is not nil, otherwise default-val is returned
raw docstring


(write-rcv-fetch-req! state
                      {:keys [partition topic offset] :as wu}

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close