App produce command
App produce command
Internal consumer code, for consumer public api see kafka-clj.node
Internal consumer code, for consumer public api see kafka-clj.node
Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition.
Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it.
This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"])
(use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))
(def res (do-work-unit! consumer (fn [state status resp-data] state)))
Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}
Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition. Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it. This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"]) (use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (def res (do-work-unit! consumer (fn [state status resp-data] state))) Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}
Internal consumer helper api for receiving and publishing work units to redis The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!
Internal consumer helper api for receiving and publishing work units to redis The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!
Raw api for sending and reading kafka fetch offset and fetch messages requests, reading the fetch message is done by the java class kafka_clj.util.Fetch
Raw api for sending and reading kafka fetch offset and fetch messages requests, reading the fetch message is done by the java class kafka_clj.util.Fetch
USAGE: Use for JAAS Kerberos Plain Text
(def tcp-client (... create tcp client ...) (def c (jaas/jaas-login "KafkaClient")) (def sasl-client (jaas/sasl-client c (jaas/principal-name c) broker-host)) (jaas/sasl-handshake! tcp-client sasl-client timeout-ms)
System environment config must be set, see the project.clj file for this project. Properties required are:
-Djava.security.auth.login.config=/vagrant/vagrant/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/vagrant/vagrant/config/krb5.conf
USAGE: Use for JAAS Kerberos Plain Text (def tcp-client (... create tcp client ...) (def c (jaas/jaas-login "KafkaClient")) (def sasl-client (jaas/sasl-client c (jaas/principal-name c) broker-host)) (jaas/sasl-handshake! tcp-client sasl-client timeout-ms) System environment config must be set, see the project.clj file for this project. Properties required are: -Djava.security.auth.login.config=/vagrant/vagrant/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/vagrant/vagrant/config/krb5.conf
All functions are concerned with retreiving metadata and offsets from kafka brokers part of this task requires blacklisting of brokers that do not response and retrying on connection errors
All functions are concerned with retreiving metadata and offsets from kafka brokers part of this task requires blacklisting of brokers that do not response and retrying on connection errors
Public API for the Pool and Keyed Pools
All pools return objects of type PoolObj to get the value use pool-obj-val
Public API for the Pool and Keyed Pools All pools return objects of type PoolObj to get the value use pool-obj-val
Object pool using Sempahore and Clojure's atom
Use create-atom-obj-pool to create an atom object pool These functions as made to be used primarily from the kafka-clj.pool.keyed namespace and the kafka-clj.pool.api is used to work with the keyed pool
Object pool using Sempahore and Clojure's atom Use create-atom-obj-pool to create an atom object pool These functions as made to be used primarily from the kafka-clj.pool.keyed namespace and the kafka-clj.pool.api is used to work with the keyed pool
Functions and records for a KeyedPool. A keyed pool contains one IObjPool instance per key usage: tcp connection pool per server where the key is the server host and port
Use the create-atom-keyed-obj-pool function to create the pool then use the kafka-clj.pool.api functions to poll, return and close the pool
Functions and records for a KeyedPool. A keyed pool contains one IObjPool instance per key usage: tcp connection pool per server where the key is the server host and port Use the create-atom-keyed-obj-pool function to create the pool then use the kafka-clj.pool.api functions to poll, return and close the pool
Pool utility functions and data types
Pool utility functions and data types
Simple Direct TCP Client for the producer The producers sit behind an async buffer where data is pushed on by multiple threads, the TCP sending itself does not need yet another layer or indirection which at the current moment under high loads with huge messages can cause out of memory errors
Usage (def client (tcp/tcp-client "localhost" 7002)) (tcp/write! client "one two three" :flush true) (tcp/read-async-loop! client (fn [^bytes bts] (prn (String. bts)))) (tcp/close! client)
Provides a pooled connection via tcp-pool the config options supported are from GenericKeyedObjectPoolConfig
Simple Direct TCP Client for the producer The producers sit behind an async buffer where data is pushed on by multiple threads, the TCP sending itself does not need yet another layer or indirection which at the current moment under high loads with huge messages can cause out of memory errors Usage (def client (tcp/tcp-client "localhost" 7002)) (tcp/write! client "one two three" :flush true) (tcp/read-async-loop! client (fn [^bytes bts] (prn (String. bts)))) (tcp/close! client) Provides a pooled connection via tcp-pool the config options supported are from GenericKeyedObjectPoolConfig
remove secular dependancies from jaas to tcp
remove secular dependancies from jaas to tcp
Create topics in kafka with auto create
----------- CREATE TOPICS ----------
CreateTopics Request (Version: 0) => [create_topic_requests] timeout create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32
CreateTopics Response (Version: 0) => [topic_error_codes] topic_error_codes => topic error_code topic => STRING error_code => INT16
Create topics in kafka with auto create ----------- CREATE TOPICS ---------- CreateTopics Request (Version: 0) => [create_topic_requests] timeout create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32 CreateTopics Response (Version: 0) => [topic_error_codes] topic_error_codes => topic error_code topic => STRING error_code => INT16
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close