Liking cljdoc? Tell your friends :D

kafka-clj.consumer.work-organiser

Kafka divides topics into partitions and each partition starts at offset 0 till N. Standard clients take one partition to one client, but this creates coordination and scalalbility issues. E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N. Then adds each work unit to a redis list. Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka. If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster USAGE (use 'kafka-clj.consumer.work-organiser :reload) (def org (create-organiser! {:bootstrap-brokers [{:host "localhost" :port 9092}] :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}})) (calculate-new-work org ["ping"])

(use 'kafka-clj.consumer.consumer :reload) (def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

Basic data type is the work-unit structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

Kafka divides topics into partitions and each partition starts at offset 0 till N.
Standard clients take one partition to one client, but this creates coordination and scalalbility issues.
E.g if a client died and did not remove its lock for a partition, a certian timeout needs to happen, but
lets say the client is just slow and will recover eventually still thinking that it has a lock on the partition, when
it might have timedout and start reading from this partition.

Kafka-clj takes a different approach, it divides the partitions into work units each of length N.
Then adds each work unit to a redis list.
Each client will poll this redis list and take a work unit, then read the data for that work unit from kafka.
If a work unit fails and put on the complete queue with a fail status, the work organiser will see this work unit
and try to recalculate the metadata for it.

This namespace requires a running redis and kafka cluster
 USAGE
(use 'kafka-clj.consumer.work-organiser :reload)
(def org (create-organiser!
{:bootstrap-brokers [{:host "localhost" :port 9092}]
 :redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))
  (calculate-new-work org ["ping"])


(use 'kafka-clj.consumer.consumer :reload)
(def consumer (consumer-start {:redis-conf {:host "localhost" :max-active 5 :timeout 1000} :working-queue "working" :complete-queue "complete" :work-queue "work" :conf {}}))

(def res (do-work-unit! consumer (fn [state status resp-data] state)))

 Basic data type is the work-unit
 structure is {:topic topic :partition partition :offset start-offset :len l :max-offset (+ start-offset l) :producer {:host host :port port}}

raw docstring

kafka-clj.consumer.workunits

Internal consumer helper api for receiving and publishing work units to redis The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!

Internal consumer helper api for receiving and publishing work units to redis
The public functions are get-work-unit!, publish-consumed-wu!, publish-error-wu! and publisher-zero-consumed-wu!
raw docstring

kafka-clj.fetch

Raw api for sending and reading kafka fetch offset and fetch messages requests, reading the fetch message is done by the java class kafka_clj.util.Fetch

Raw api for sending and reading kafka fetch offset and fetch messages requests,
reading the fetch message is done by the java class  kafka_clj.util.Fetch
raw docstring

kafka-clj.jaas

USAGE: Use for JAAS Kerberos Plain Text

(def tcp-client (... create tcp client ...) (def c (jaas/jaas-login "KafkaClient")) (def sasl-client (jaas/sasl-client c (jaas/principal-name c) broker-host)) (jaas/sasl-handshake! tcp-client sasl-client timeout-ms)

System environment config must be set, see the project.clj file for this project. Properties required are:

-Djava.security.auth.login.config=/vagrant/vagrant/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/vagrant/vagrant/config/krb5.conf

USAGE:
Use for JAAS Kerberos Plain Text


(def tcp-client (... create tcp client ...)
(def c (jaas/jaas-login "KafkaClient"))
(def sasl-client (jaas/sasl-client c (jaas/principal-name c) broker-host))
(jaas/sasl-handshake! tcp-client sasl-client timeout-ms)

System environment config must be set, see the project.clj file for this project.
Properties required are:

-Djava.security.auth.login.config=/vagrant/vagrant/config/kafka_client_jaas.conf
-Djava.security.krb5.conf=/vagrant/vagrant/config/krb5.conf
raw docstring

kafka-clj.metadata

All functions are concerned with retreiving metadata and offsets from kafka brokers part of this task requires blacklisting of brokers that do not response and retrying on connection errors

All functions are concerned with retreiving metadata and offsets from kafka brokers
part of this task requires blacklisting of brokers that do not response and retrying on connection errors
raw docstring

kafka-clj.pool.api

Public API for the Pool and Keyed Pools

All pools return objects of type PoolObj to get the value use pool-obj-val

Public API for the Pool and Keyed Pools

All pools return objects of type PoolObj to get the value use pool-obj-val
raw docstring

kafka-clj.pool.atom

Object pool using Sempahore and Clojure's atom

Use create-atom-obj-pool to create an atom object pool These functions as made to be used primarily from the kafka-clj.pool.keyed namespace and the kafka-clj.pool.api is used to work with the keyed pool

Object pool using Sempahore and Clojure's atom

Use create-atom-obj-pool to create an atom object pool
These functions as made to be used primarily from the kafka-clj.pool.keyed namespace
and the kafka-clj.pool.api is used to work with the keyed pool
raw docstring

kafka-clj.pool.keyed

Functions and records for a KeyedPool. A keyed pool contains one IObjPool instance per key usage: tcp connection pool per server where the key is the server host and port

Use the create-atom-keyed-obj-pool function to create the pool then use the kafka-clj.pool.api functions to poll, return and close the pool

Functions and records for a KeyedPool.
A keyed pool contains one IObjPool instance per key
   usage: tcp connection pool per server where the key is the server host and port

Use the create-atom-keyed-obj-pool function to create the pool
then use the kafka-clj.pool.api functions to poll, return and close the pool
raw docstring

kafka-clj.tcp

Simple Direct TCP Client for the producer The producers sit behind an async buffer where data is pushed on by multiple threads, the TCP sending itself does not need yet another layer or indirection which at the current moment under high loads with huge messages can cause out of memory errors

Usage (def client (tcp/tcp-client "localhost" 7002)) (tcp/write! client "one two three" :flush true) (tcp/read-async-loop! client (fn [^bytes bts] (prn (String. bts)))) (tcp/close! client)

Provides a pooled connection via tcp-pool the config options supported are from GenericKeyedObjectPoolConfig

Simple Direct TCP Client for the producer
The producers sit behind an async buffer where data is pushed on
by multiple threads, the TCP sending itself does not need yet another layer or indirection
which at the current moment under high loads with huge messages can cause out of memory errors

Usage
(def client (tcp/tcp-client "localhost" 7002))
(tcp/write! client "one two three" :flush true)
(tcp/read-async-loop! client (fn [^bytes bts] (prn (String. bts))))
(tcp/close! client)

 Provides a pooled connection via tcp-pool the config options supported are from GenericKeyedObjectPoolConfig

raw docstring

kafka-clj.tcp-api

remove secular dependancies from jaas to tcp

remove secular dependancies from jaas to tcp
raw docstring

kafka-clj.topics

Create topics in kafka with auto create

----------- CREATE TOPICS ----------

CreateTopics Request (Version: 0) => [create_topic_requests] timeout create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32

CreateTopics Response (Version: 0) => [topic_error_codes] topic_error_codes => topic error_code topic => STRING error_code => INT16

Create topics in kafka with auto create


----------- CREATE TOPICS ----------

CreateTopics Request (Version: 0) => [create_topic_requests] timeout
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs]
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition_id [replicas]
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value
      config_key => STRING
      config_value => STRING
    timeout => INT32

  CreateTopics Response (Version: 0) => [topic_error_codes]
    topic_error_codes => topic error_code
      topic => STRING
      error_code => INT16

raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close