Liking cljdoc? Tell your friends :D

kafka-clj.consumer.node


add-topics!clj

(add-topics! node topics)

Add topics to the node's topics-ref set, this will cause the orgnaniser run by the node to check for workunits for the topics topics must be a vector seq or set

Add topics to the node's topics-ref set, this will cause the orgnaniser run by the node to check for workunits for the topics
topics must be a vector seq or set
raw docstring

buffered-msgsclj

(buffered-msgs {:keys [msg-ch]} n timeout-ms)

Creates a channel that on each read returns n messages, or as much as could be buffered befire timeout-ms. The buffering happens in the background

Creates a channel that on each read returns n messages, or as much as could be buffered befire timeout-ms.
The buffering happens in the background
raw docstring

conn-pool-activeclj

(conn-pool-active {:keys [redis-conn]})

Return the number of active redis connections used by the consumer node

Return the number of active redis connections used by the consumer node
raw docstring

conn-pool-byte-sizeclj

(conn-pool-byte-size {:keys [redis-conn]})

Return the total bytes referenced by the redis connection pool

Return the total bytes referenced by the redis connection pool
raw docstring

conn-pool-idleclj

(conn-pool-idle {:keys [redis-conn]})

Return the number of idle redis connections used by the consumer node

Return the number of idle redis connections used by the consumer node
raw docstring

copy-redis-queueclj

(copy-redis-queue redis-conn from-queue to-queue)

This function copies data from one list/queue to another Its used on startup to copy any leftover workunits in the working queue to the work queue

This function copies data from one list/queue to another
Its used on startup to copy any leftover workunits in the working queue to the work queue
raw docstring

create-kafka-node-serviceclj

(create-kafka-node-service conf topics)

create-node!clj

(create-node! conf
              topics
              &
              {:keys [error-handler redis-factory msg-ch-buff-size
                      work-unit-event-ch-buff-size]
               :or {error-handler (fn [& args])
                    redis-factory redis/create
                    msg-ch-buff-size 100
                    work-unit-event-ch-buff-size 100}})

Create a consumer node that represents a consumer using an organiser consumer and group conn to coordinate colaborative consumption The following keys must exist in the configuration :redis-conf {:group-name :host } defaults group-name "default" host localhost :bootstrap-brokers e.g [{:host :port}]

Optional keys: :conf {:work-calculate-freq ;;the frequency in millis at which new work is calculated, default 10000ms :use-earliest ;;if the topic information is not saved to redis the earliest available offset is used and saved, ;;otherwise the most recent offset is used. :pool-limit 20 ;;the tcp pool limit for the consumer :max-offsets ;;default 10, if use-earliest is true the earliest offset is used looking back up to max-offsets :jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html

   }

Redis groups: Three redis groups are created, $group-name-"kafka-work-queue", $group-name-"kafka-working-queue", $group-name-"kafka-complete-queue", $group-name-"kafka-error-queue"

Note that unrecouverable work units like error-code 1 are added to the kafka-error-queue. This queue should be monitored. Returns a map {:conf intermediate-conf :topics-ref topics-ref :org org :msg-ch msg-ch :consumer consumer :calc-work-thread calc-work-thread :group-conn group-conn :group-name group-name}

Create a consumer node that represents a consumer using an organiser consumer and group conn to coordinate colaborative consumption
The following keys must exist in the configuration
:redis-conf {:group-name :host } defaults group-name "default" host localhost
:bootstrap-brokers e.g [{:host :port}]

Optional keys:
:conf {:work-calculate-freq     ;;the frequency in millis at which new work is calculated, default 10000ms
       :use-earliest ;;if the topic information is not saved to redis the earliest available offset is used and saved,
                     ;;otherwise the most recent offset is used.
       :pool-limit 20 ;;the tcp pool limit for the consumer
       :max-offsets ;;default 10, if use-earliest is true the earliest offset is used looking back up to max-offsets
       :jaas if set the jaas authentication will be used with each tcp connection
              this value should point to the jaas config file.
              for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html

       }


Redis groups:
 Three redis groups are created, $group-name-"kafka-work-queue", $group-name-"kafka-working-queue", $group-name-"kafka-complete-queue", $group-name-"kafka-error-queue"

Note that unrecouverable work units like error-code 1 are added to the kafka-error-queue. This queue should be monitored.
Returns a map {:conf intermediate-conf :topics-ref topics-ref :org org :msg-ch msg-ch :consumer consumer :calc-work-thread calc-work-thread :group-conn group-conn :group-name group-name}
raw docstring

msg-chan-byte-sizeclj

(msg-chan-byte-size {:keys [msg-ch]})

Return the number of bytes referenced by consumer channel

Return the number of bytes referenced by consumer channel
raw docstring

msg-seq!clj

(msg-seq! node)

msg-seq-buffered!clj

(msg-seq-buffered! node & {:keys [step] :or {step 1000}})

Will always return a sequence of sequence of messages i.e [ [msg1, msg2, msg3] .. ] Acceps :step n which is the number of messages per sequence inside the main sequence

Will always return a sequence of sequence of messages i.e [ [msg1, msg2, msg3] .. ]
Acceps :step n which is the number of messages per sequence inside the main sequence
raw docstring

node-statsclj

(node-stats {:keys [consumer stats-atom]})

Returns the consumer stats, takes as argument the instance returned from create-node!

Returns the consumer stats, takes as argument the instance returned from create-node!
raw docstring

read-msg!clj

(read-msg! {:keys [msg-ch]})
(read-msg! {:keys [msg-ch]} timeout-ms)

Accepts a the return value of create-node! and blocks on msg-ch The return value is a collection of Message [topic partition offset bts]

Accepts a the return value of create-node! and blocks on msg-ch
The return value is a collection of Message [topic partition offset bts]
raw docstring

remove-topics!clj

(remove-topics! node topics)

Removes topics from the node's topics-ref set, this will cause the organiser to stop submitting workunits for the topics topics must be a vector seq or set

Removes topics from the node's topics-ref set, this will cause the organiser to stop submitting workunits for the topics
topics must be a vector seq or set
raw docstring

shutdown-node!clj

(shutdown-node! {:keys [org consumer msg-ch calc-work-thread redis-conn]
                 :as node})

Closes the consumer node

Closes the consumer node
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close