Liking cljdoc? Tell your friends :D

kafka-clj.consumer.consumer

Internal consumer code, for consumer public api see kafka-clj.node

Internal consumer code, for consumer public api see kafka-clj.node
raw docstring

auto-tune-fetchclj

(auto-tune-fetch max-bytes-at state metadata-connector delegate-f wu)

Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.

Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.
raw docstring

close-consumer!clj

(close-consumer! {:keys [publish-exec-service exec-service metadata-connector
                         shutdown-flag]})

consume!clj

(consume! {:keys [conf msg-ch work-unit-event-ch metadata-connector] :as state})

Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full The actual consume! function returns inmediately

reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout

:pool-limit 20 ; number of tcp pool connections to create

:jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html

Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units
from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is
sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full
The actual consume! function returns inmediately

 reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout

 :pool-limit 20 ; number of tcp pool connections to create

 :jaas if set the jaas authentication will be used with each tcp connection
       this value should point to the jaas config file.
       for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html
raw docstring

consumer-pool-statsclj

(consumer-pool-stats {:keys [exec-service] :as conn})

Return a stats map for instances returned from the consume! function

Return a stats map for instances returned from the consume! function
raw docstring

DEFAULT-MAX-BTS-REMclj


fetch-stateclj

(fetch-state delegate-f {:keys [topic partition max-offset] :as m})

Creates a mutable initial state

Creates a mutable initial state
raw docstring

IMsgEventcljprotocol

Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response

Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response

-msg-eventclj

(-msg-event msg state)

Must return FetchState status can be :ok or :error

Must return FetchState status can be :ok or :error
raw docstring

increase-decclj

(increase-dec v update-ts)

Increase the decrementing value by a quarter of its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB

Increase the decrementing value by a quarter of its size up to two megs,
if the value is already TWO-MEGS long its reset to 1KB
raw docstring

increase-incclj

(increase-inc v update-ts)

Increase the incrementing value by half its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB

Increase the incrementing value by half its size up to two megs,
if the value is already TWO-MEGS long its reset to 1KB
raw docstring

increase-valclj

(increase-val v decayer MAX RESET-VAL updates-ts)

increase the value v by (/ v decayer) up to a max of two megs, if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value

increase the value v by (/ v decayer) up to a max of two megs,
if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value
raw docstring

mark!clj

(mark! meter)

Mark messages read meter

Mark messages read meter
raw docstring

mark-max-bytes!clj

(mark-max-bytes! topic maxbts)

mark-min-bytes!clj

(mark-min-bytes! topic minbts)

messages-readclj


metrics-registryclj


METRICS-REPORTING-STARTEDclj


ONE-KBclj


over-seconds-ago?clj

(over-seconds-ago? time-ms seconds)

process-wu!clj

(process-wu! state metadata-connector delegate-f wu)

Borrow a connection Write a fetch request Process the fetch request sending all messages to delegate-f Publish the wu as consumed to redis, or on error as error

 Borrow a connection
Write a fetch request
Process the fetch request sending all messages to delegate-f
Publish the wu as consumed to redis, or on error as error
raw docstring

read-process-resp!clj

(read-process-resp! delegate-f wu bts)

Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg Returns [status offset discarded min max] Throws: Exception, may block

Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg
Returns [status offset discarded min max]
Throws: Exception, may block
raw docstring

show-work-unit-thread-statsclj

(show-work-unit-thread-stats {:keys [work-unit-thread-stats]})

public function Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}

public function
Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}
raw docstring

start-metrics-reporting!clj

(start-metrics-reporting!)

Start the metrics reporting, writing to STDOUT every 10 seconds

Start the metrics reporting, writing to STDOUT every 10 seconds
raw docstring

TWENTY-MEGSclj


TWO-MEGSclj


update-dec-remclj

(update-dec-rem [max-bts rem-inc rem-dec updated-ts])

Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor

Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor
raw docstring

update-inc-remclj

(update-inc-rem [max-bts rem-inc rem-dec updated-ts])

Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor

Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor
raw docstring

update-state-max-bytesclj

(update-state-max-bytes state max-bytes)

update-work-unit-thread-stats!clj

(update-work-unit-thread-stats! work-unit-thread-stats start-ts end-ts wu)

Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}

Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}
raw docstring

with-defaultclj

(with-default f default-val)

Apply the function to its argument only if the argument is not nil, otherwise default-val is returned

Apply the function to its argument only if the argument is not nil, otherwise default-val is returned
raw docstring

write-rcv-fetch-req!clj

(write-rcv-fetch-req! state
                      delegate-f
                      {:keys [partition topic offset] :as wu}
                      conn)

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close