Internal consumer code, for consumer public api see kafka-clj.node
Internal consumer code, for consumer public api see kafka-clj.node
(auto-tune-fetch max-bytes-at state metadata-connector delegate-f wu)
Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.
Wraps around the process-wu! function and use the return state to calculate what the max-bytes in up comming fetch requests should be.
(close-consumer! {:keys [publish-exec-service exec-service metadata-connector
shutdown-flag]})
(consume! {:keys [conf msg-ch work-unit-event-ch metadata-connector] :as state})
Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full The actual consume! function returns inmediately
reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout
:pool-limit 20 ; number of tcp pool connections to create
:jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html
Starts the consumer consumption process, by initiating redis-fetch-threads(default 1)+consumer-threads threads, one thread is used to wait for work-units from redis, and the other threads are used to process the work-unit, the resp data from each work-unit's processing result is sent to the msg-ch, note that the send to msg-ch is a blocking send, meaning that the whole process will block if msg-ch is full The actual consume! function returns inmediately reporting: if (get :consumer-reporting conf) is true then messages consumed metrics will be written every 10 seconds to stdout :pool-limit 20 ; number of tcp pool connections to create :jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html
(consumer-pool-stats {:keys [exec-service] :as conn})
Return a stats map for instances returned from the consume! function
Return a stats map for instances returned from the consume! function
(fetch-state delegate-f {:keys [topic partition max-offset] :as m})
Creates a mutable initial state
Creates a mutable initial state
Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response
Simplifies the logic of processing a FetchError and normal Message instance from a broker fetch response
(-msg-event msg state)
Must return FetchState status can be :ok or :error
Must return FetchState status can be :ok or :error
(increase-dec v update-ts)
Increase the decrementing value by a quarter of its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB
Increase the decrementing value by a quarter of its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB
(increase-inc v update-ts)
Increase the incrementing value by half its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB
Increase the incrementing value by half its size up to two megs, if the value is already TWO-MEGS long its reset to 1KB
(increase-val v decayer MAX RESET-VAL updates-ts)
increase the value v by (/ v decayer) up to a max of two megs, if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value
increase the value v by (/ v decayer) up to a max of two megs, if v is already MAX or the values has been updated more than 60 seconds ago RESET-VAL is returned to reset the value
(mark-max-bytes! topic maxbts)
(mark-min-bytes! topic minbts)
(over-seconds-ago? time-ms seconds)
(process-wu! state metadata-connector delegate-f wu)
Borrow a connection Write a fetch request Process the fetch request sending all messages to delegate-f Publish the wu as consumed to redis, or on error as error
Borrow a connection Write a fetch request Process the fetch request sending all messages to delegate-f Publish the wu as consumed to redis, or on error as error
(read-process-resp! delegate-f wu bts)
Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg Returns [status offset discarded min max] Throws: Exception, may block
Reads the response from the TCP conn, then calls fetch/read-fetch and process with handle-msg Returns [status offset discarded min max] Throws: Exception, may block
(show-work-unit-thread-stats {:keys [work-unit-thread-stats]})
public function Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}
public function Return the work-unit-thread-stats that show key=thread value={:ts <the time the wu was seen> :duration <time it took for fetch> :wu <work-unit>}
(start-metrics-reporting!)
Start the metrics reporting, writing to STDOUT every 10 seconds
Start the metrics reporting, writing to STDOUT every 10 seconds
(update-dec-rem [max-bts rem-inc rem-dec updated-ts])
Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor
Update the memory set containing [max-bts incrementor decrementor updated-ts], increasing the decrementor
(update-inc-rem [max-bts rem-inc rem-dec updated-ts])
Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor
Update the memory set containing [max-bts incrementor decrementor updated-ts] increasing the incrementor
(update-state-max-bytes state max-bytes)
(update-work-unit-thread-stats! work-unit-thread-stats start-ts end-ts wu)
Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}
Update the map work-unit-thread-stats with key=<thread-name> value={:ts <timestamp> wu: <work-unit> :duration <ts-ms>}
(with-default f default-val)
Apply the function to its argument only if the argument is not nil, otherwise default-val is returned
Apply the function to its argument only if the argument is not nil, otherwise default-val is returned
(write-rcv-fetch-req! state
delegate-f
{:keys [partition topic offset] :as wu}
conn)
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close