(add-topics! node topics)
Add topics to the node's topics-ref set, this will cause the orgnaniser run by the node to check for workunits for the topics topics must be a vector seq or set
Add topics to the node's topics-ref set, this will cause the orgnaniser run by the node to check for workunits for the topics topics must be a vector seq or set
(buffered-msgs {:keys [msg-ch]} n timeout-ms)
Creates a channel that on each read returns n messages, or as much as could be buffered befire timeout-ms. The buffering happens in the background
Creates a channel that on each read returns n messages, or as much as could be buffered befire timeout-ms. The buffering happens in the background
(conn-pool-active {:keys [redis-conn]})
Return the number of active redis connections used by the consumer node
Return the number of active redis connections used by the consumer node
(conn-pool-byte-size {:keys [redis-conn]})
Return the total bytes referenced by the redis connection pool
Return the total bytes referenced by the redis connection pool
(conn-pool-idle {:keys [redis-conn]})
Return the number of idle redis connections used by the consumer node
Return the number of idle redis connections used by the consumer node
(copy-redis-queue redis-conn from-queue to-queue)
This function copies data from one list/queue to another Its used on startup to copy any leftover workunits in the working queue to the work queue
This function copies data from one list/queue to another Its used on startup to copy any leftover workunits in the working queue to the work queue
(create-kafka-node-service conf topics)
(create-node! conf
topics
&
{:keys [error-handler redis-factory msg-ch-buff-size
work-unit-event-ch-buff-size]
:or {error-handler (fn [& args])
redis-factory redis/create
msg-ch-buff-size 100
work-unit-event-ch-buff-size 100}})
Create a consumer node that represents a consumer using an organiser consumer and group conn to coordinate colaborative consumption The following keys must exist in the configuration :redis-conf {:group-name :host } defaults group-name "default" host localhost :bootstrap-brokers e.g [{:host :port}]
Optional keys: :conf {:work-calculate-freq ;;the frequency in millis at which new work is calculated, default 10000ms :use-earliest ;;if the topic information is not saved to redis the earliest available offset is used and saved, ;;otherwise the most recent offset is used. :pool-limit 20 ;;the tcp pool limit for the consumer :max-offsets ;;default 10, if use-earliest is true the earliest offset is used looking back up to max-offsets :jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html
}
Redis groups: Three redis groups are created, $group-name-"kafka-work-queue", $group-name-"kafka-working-queue", $group-name-"kafka-complete-queue", $group-name-"kafka-error-queue"
Note that unrecouverable work units like error-code 1 are added to the kafka-error-queue. This queue should be monitored. Returns a map {:conf intermediate-conf :topics-ref topics-ref :org org :msg-ch msg-ch :consumer consumer :calc-work-thread calc-work-thread :group-conn group-conn :group-name group-name}
Create a consumer node that represents a consumer using an organiser consumer and group conn to coordinate colaborative consumption The following keys must exist in the configuration :redis-conf {:group-name :host } defaults group-name "default" host localhost :bootstrap-brokers e.g [{:host :port}] Optional keys: :conf {:work-calculate-freq ;;the frequency in millis at which new work is calculated, default 10000ms :use-earliest ;;if the topic information is not saved to redis the earliest available offset is used and saved, ;;otherwise the most recent offset is used. :pool-limit 20 ;;the tcp pool limit for the consumer :max-offsets ;;default 10, if use-earliest is true the earliest offset is used looking back up to max-offsets :jaas if set the jaas authentication will be used with each tcp connection this value should point to the jaas config file. for more information see http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/AcnOnly.html } Redis groups: Three redis groups are created, $group-name-"kafka-work-queue", $group-name-"kafka-working-queue", $group-name-"kafka-complete-queue", $group-name-"kafka-error-queue" Note that unrecouverable work units like error-code 1 are added to the kafka-error-queue. This queue should be monitored. Returns a map {:conf intermediate-conf :topics-ref topics-ref :org org :msg-ch msg-ch :consumer consumer :calc-work-thread calc-work-thread :group-conn group-conn :group-name group-name}
(msg-chan-byte-size {:keys [msg-ch]})
Return the number of bytes referenced by consumer channel
Return the number of bytes referenced by consumer channel
(msg-seq! node)
(msg-seq-buffered! node & {:keys [step] :or {step 1000}})
Will always return a sequence of sequence of messages i.e [ [msg1, msg2, msg3] .. ] Acceps :step n which is the number of messages per sequence inside the main sequence
Will always return a sequence of sequence of messages i.e [ [msg1, msg2, msg3] .. ] Acceps :step n which is the number of messages per sequence inside the main sequence
(node-stats {:keys [consumer stats-atom]})
Returns the consumer stats, takes as argument the instance returned from create-node!
Returns the consumer stats, takes as argument the instance returned from create-node!
(read-msg! {:keys [msg-ch]})
(read-msg! {:keys [msg-ch]} timeout-ms)
Accepts a the return value of create-node! and blocks on msg-ch The return value is a collection of Message [topic partition offset bts]
Accepts a the return value of create-node! and blocks on msg-ch The return value is a collection of Message [topic partition offset bts]
(remove-topics! node topics)
Removes topics from the node's topics-ref set, this will cause the organiser to stop submitting workunits for the topics topics must be a vector seq or set
Removes topics from the node's topics-ref set, this will cause the organiser to stop submitting workunits for the topics topics must be a vector seq or set
(shutdown-node! {:keys [org consumer msg-ch calc-work-thread redis-conn]
:as node})
Closes the consumer node
Closes the consumer node
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close