Liking cljdoc? Tell your friends :D

kinsky.client

Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.

Small clojure shim on top of the Kafka client API
See https://github.com/pyr/kinsky for example usage.
raw docstring

->deserializerclj

(->deserializer x)
source

->headerclj

(->header [k v])
(->header k v)

Build a Kafka header from a key and a value

Build a Kafka header from a key and a value
sourceraw docstring

->headersclj

(->headers headers)

Build Kafka headers from a clojure map.

Build Kafka headers from a clojure map.
sourceraw docstring

->offset-metadataclj

(->offset-metadata {:keys [offset metadata]})

Yield a OffsetAndMetadata from a clojure map.

Yield a OffsetAndMetadata from a clojure map.
sourceraw docstring

->recordclj

(->record payload)

Build a producer record from a clojure map. Leave ProducerRecord instances untouched.

Build a producer record from a clojure map. Leave ProducerRecord instances
untouched.
sourceraw docstring

->serializerclj

(->serializer x)
source

->topic-partitionclj

(->topic-partition {:keys [topic partition]})

Yield a TopicPartition from a clojure map.

Yield a TopicPartition from a clojure map.
sourceraw docstring

->topicsclj

(->topics topics)

Yield a valid object for subscription

Yield a valid object for subscription
sourceraw docstring

consumerclj

(consumer config)
(consumer config callback)
(consumer config kdeserializer vdeserializer)
(consumer config callback kdeserializer vdeserializer)

Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.

Create a consumer from a configuration and optional deserializers.
If a callback is given, call it when stopping the consumer.
If deserializers are provided, use them otherwise expect deserializer
class name in the config map.
sourceraw docstring

consumer->driverclj

(consumer->driver consumer)
(consumer->driver consumer
                  {:kinsky.client/keys [run-fn consumer-decoder-fn]
                   :or {consumer-decoder-fn consumer-records->data}})

Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.

The consumer driver implements the following protocols:

The consumer-driver can also take options:

  • consumer-decoder-fn: a function that will potentially transform the ConsumerRecords returned by kafka. By default it will use consumer-records->data
Given a consumer-driver and an optional callback to callback
 to call when stopping, yield a consumer driver.

 The consumer driver implements the following protocols:

 - [ConsumerDriver](#var-ConsumerDriver)
 - [MetadataDriver](#var-MetadataDriver)
 - `clojure.lang.IDeref`: `deref` to access underlying
   [KafkaConsumer](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
   instance.

The consumer-driver can also take options:

* `consumer-decoder-fn`: a function that will potentially transform
the ConsumerRecords returned by kafka. By default it will use
`consumer-records->data`
sourceraw docstring

consumer-records->dataclj

(consumer-records->data crs)

Yield the default Clojure representation of topic

Yield the default Clojure representation of topic
sourceraw docstring

ConsumerDrivercljprotocol

Driver interface for consumers

Driver interface for consumers

subscribe!clj

(subscribe! this topics)
(subscribe! this topics listener)

Subscribe to a topic or list of topics. The topics argument can be:

  • A simple string when subscribing to a single topic
  • A regex pattern to subscribe to matching topics
  • A sequence of strings

The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.

When a function is supplied, it will be called on rebalance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.

Subscribe to a topic or list of topics.
The topics argument can be:

- A simple string when subscribing to a single topic
- A regex pattern to subscribe to matching topics
- A sequence of strings

The optional listener argument is either a callback
function or an implementation of
[ConsumerRebalanceListener](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html).

When a function is supplied, it will be called on rebalance
events with a map representing the event, see
[kinsky.client/rebalance-listener](#var-rebalance-listener)
for details on the map format.

subscriptionclj

(subscription this)

Currently assigned topics

Currently assigned topics

resume!clj

(resume! this topic-partitions)

Resume consumption.

Resume consumption.

wake-up!clj

(wake-up! this)

Safely wake-up a consumer which may be blocking during polling.

Safely wake-up a consumer which may be blocking during polling.

commit!clj

(commit! this)
(commit! this topic-offsets)

Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:

{:topic     topic
 :partition partition
 :offset    offset
 :metadata  metadata}

The topic and partition tuple must be unique across the whole list.

Commit offsets for a consumer.
The topic-offsets argument must be a list of maps of the form:

```
{:topic     topic
 :partition partition
 :offset    offset
 :metadata  metadata}
```
The topic and partition tuple must be unique across the whole list.

position!clj

(position! this)
(position! this topic-partition)

Get the offset of the next record that will be fetched (if a record with that offset exists).

Get the offset of the next record that will be fetched (if a record with that offset exists).

unsubscribe!clj

(unsubscribe! this)

Unsubscribe from currently subscribed topics.

Unsubscribe from currently subscribed topics.

stop!clj

(stop! this)
(stop! this timeout)

Stop consumption.

Stop consumption.

seek!clj

(seek! this)
(seek! this topic-partition offset)

Overrides the fetch offsets that the consumer will use on the next poll

Overrides the fetch offsets that the consumer will use on the next poll

pause!clj

(pause! this)
(pause! this topic-partitions)

Pause consumption.

Pause consumption.

poll!clj

(poll! this timeout)

Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.

{:partitions [["t" 0] ["t" 1]]
 :topics     ["t"]
 :count      2
 :by-partition {["t" 0] [{:key       "k0"
                            :offset    1
                            :partition 0
                            :timestamp 1233524522
                            :topic     "t"
                            :value     "v0"
                            :headers   {:h1 ["1" "2"]}}]
                ["t" 1] [{:key       "k1"
                            :offset    1
                            :partition 1
                            :timestamp 1233524527
                            :topic     "t"
                            :value     "v1"}
                            :headers   {:h1 ["1"]}]}
 :by-topic      {"t" [{:key       "k0"
                         :offset    1
                         :partition 0
                         :timestamp 1233524522
                         :topic     "t"
                         :value     "v0"
                         :headers   {:h1 ["1" "2"]}}
                        {:key       "k1"
                         :offset    1
                         :partition 1
                         :timestamp 1233524527
                         :topic     "t"
                         :value     "v1"
                         :headers   {:h1 ["1"]}}]}}
Poll for new messages. Timeout in ms.
The result is a data representation of a ConsumerRecords instance.

    {:partitions [["t" 0] ["t" 1]]
     :topics     ["t"]
     :count      2
     :by-partition {["t" 0] [{:key       "k0"
                                :offset    1
                                :partition 0
                                :timestamp 1233524522
                                :topic     "t"
                                :value     "v0"
                                :headers   {:h1 ["1" "2"]}}]
                    ["t" 1] [{:key       "k1"
                                :offset    1
                                :partition 1
                                :timestamp 1233524527
                                :topic     "t"
                                :value     "v1"}
                                :headers   {:h1 ["1"]}]}
     :by-topic      {"t" [{:key       "k0"
                             :offset    1
                             :partition 0
                             :timestamp 1233524522
                             :topic     "t"
                             :value     "v0"
                             :headers   {:h1 ["1" "2"]}}
                            {:key       "k1"
                             :offset    1
                             :partition 1
                             :timestamp 1233524527
                             :topic     "t"
                             :value     "v1"
                             :headers   {:h1 ["1"]}}]}}
sourceraw docstring

cr->dataclj

(cr->data cr)

Yield a clojure representation of a consumer record

Yield a clojure representation of a consumer record
sourceraw docstring

crs->eductionclj

(crs->eduction crs)

Returns consumer records as clojure.lang.Eduction to be used in potential reducible context

Returns consumer records as clojure.lang.Eduction to be used in
potential reducible context
sourceraw docstring

crs-for-topic+partition->eductionclj

(crs-for-topic+partition->eduction crs topic partition)

Returns an Eduction for records for TopicPartition

Returns an Eduction for records for TopicPartition
sourceraw docstring

crs-for-topic->eductionclj

(crs-for-topic->eduction crs topic)

Returns an Eduction for records by topics

Returns an Eduction for records by topics
sourceraw docstring

deserializerclj

(deserializer f)

Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.

Yield an instance of a deserializer from a function of two arguments:
a topic and the payload to deserialize.
sourceraw docstring

deserializersclj

source

edn-deserializerclj

(edn-deserializer)
(edn-deserializer reader-opts)

Deserialize EDN.

Deserialize EDN.
sourceraw docstring

edn-serializerclj

(edn-serializer)

Serialize as EDN.

Serialize as EDN.
sourceraw docstring

GenericDrivercljprotocol

close!clj

(close! this)
(close! this timeout)

Close this driver

Close this driver
source

headers->mapclj

(headers->map headers)

Build a clojure map out of Kafka Headers from the returned RecordHeaders object. As the Kafka Headers can have duplicates, the map returned merges any such cases: {:h1 ["123" "456"] :h2 ["Hello"]}

Build a clojure map out of Kafka Headers from the returned RecordHeaders object. As the Kafka Headers can have
duplicates, the map returned merges any such cases: {:h1 ["123" "456"] :h2 ["Hello"]}
sourceraw docstring

json-deserializerclj

(json-deserializer)

Deserialize JSON.

Deserialize JSON.
sourceraw docstring

json-serializerclj

(json-serializer)

Serialize as JSON through jsonista.

Serialize as JSON through jsonista.
sourceraw docstring

keyword-deserializerclj

(keyword-deserializer)

Deserialize a string and then keywordize it.

Deserialize a string and then keywordize it.
sourceraw docstring

keyword-serializerclj

(keyword-serializer)

Serialize keywords to strings, useful for keys.

Serialize keywords to strings, useful for keys.
sourceraw docstring

MetadataDrivercljprotocol

Common properties for all drivers

Common properties for all drivers

partitions-forclj

(partitions-for this topic)

Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:

{:topic     "t"
 :partition 0
 :isr       [{:host "x" :id 0 :port 9092}]
 :leader    {:host "x" :id 0 :port 9092}
 :replicas  [{:host "x" :id 0 :port 9092}]
Retrieve partition ownership information for a topic.
The result is a data representation of a
[PartitionInfo](https://kafka.apache.org/24/javadoc/org/apache/kafka/common/PartitionInfo.html)
list.
The structure for a partition info map is:

    {:topic     "t"
     :partition 0
     :isr       [{:host "x" :id 0 :port 9092}]
     :leader    {:host "x" :id 0 :port 9092}
     :replicas  [{:host "x" :id 0 :port 9092}]
sourceraw docstring

node->dataclj

(node->data n)

Yield a clojure representation of a node.

Yield a clojure representation of a node.
sourceraw docstring

opts->propsclj

(opts->props opts)

Kakfa configs are now maps of strings to strings. Morphs an arbitrary clojure map into this representation. Make sure we don't pass options that are meant for the driver to concrete Consumers/Producers

Kakfa configs are now maps of strings to strings. Morphs an arbitrary
clojure map into this representation.  Make sure we don't pass
options that are meant for the driver to concrete Consumers/Producers
sourceraw docstring

partition-info->dataclj

(partition-info->data pi)

Yield a clojure representation of a partition-info.

Yield a clojure representation of a partition-info.
sourceraw docstring

producerclj

(producer config)
(producer config serializer)
(producer config kserializer vserializer)

Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.

Create a producer from a configuration and optional serializers.
If a single serializer is provided, it will be used for both keys
and values. If none are provided, the configuration is expected to
hold serializer class names.
sourceraw docstring

producer->driverclj

(producer->driver producer)

Yield a driver from a Kafka Producer. The producer driver implements the following protocols:

Yield a driver from a Kafka Producer.
The producer driver implements the following protocols:

- [ProducerDriver](#var-ProducerDriver)
- [MetadataDriver](#var-MetadataDriver)
- `clojure.lang.IDeref`: `deref` to access underlying
  [KafkaProducer](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)
  instance.
sourceraw docstring

ProducerDrivercljprotocol

Driver interface for producers

Driver interface for producers

begin-transaction!clj

(begin-transaction! this)

commit-transaction!clj

(commit-transaction! this)

flush!clj

(flush! this)

Ensure that produced messages are flushed.

Ensure that produced messages are flushed.

init-transactions!clj

(init-transactions! this)

send!clj

(send! this record)
(send! this topic k v)
(send! this topic k v headers)

Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: :key, :topic, :partition, :headers, :timestamp and :value.

Produce a record on a topic.
When using the single arity version, a map
with the following possible keys is expected:
`:key`, `:topic`, `:partition`, `:headers`, `:timestamp` and `:value`.
sourceraw docstring

rebalance-listenerclj

(rebalance-listener callback)

Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.

Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener.
The callback is a function of one argument, a map containing the following
keys: :event, :topic and :partition. :event will be either :assigned or
:revoked.
sourceraw docstring

record-xformclj

A transducer to explode grouped records into individual entities.

When successful, the output of kinsky.client/poll! takes the form:

{:partitions   [["t" 0] ["t" 1]]
 :topics       #{"t"}
 :count        2
 :by-partition {["t" 0] [{:key       "k0"
                            :offset    1
                            :partition 0
                            :topic     "t"
                            :value     "v0"}]
                ["t" 1] [{:key       "k1"
                            :offset    1
                            :partition 1
                            :topic     "t"
                            :value     "v1"}]}
 :by-topic      {"t" [{:key       "k0"
                         :offset    1
                         :partition 0
                         :topic     "t"
                         :value     "v0"}
                        {:key       "k1"
                         :offset    1
                         :partition 1
                         :topic     "t"
                         :value     "v1"}]}}

To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:

({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
 {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
 ...)
A transducer to explode grouped records into individual
entities.

When successful, the output of kinsky.client/poll! takes the
form:

    {:partitions   [["t" 0] ["t" 1]]
     :topics       #{"t"}
     :count        2
     :by-partition {["t" 0] [{:key       "k0"
                                :offset    1
                                :partition 0
                                :topic     "t"
                                :value     "v0"}]
                    ["t" 1] [{:key       "k1"
                                :offset    1
                                :partition 1
                                :topic     "t"
                                :value     "v1"}]}
     :by-topic      {"t" [{:key       "k0"
                             :offset    1
                             :partition 0
                             :topic     "t"
                             :value     "v0"}
                            {:key       "k1"
                             :offset    1
                             :partition 1
                             :topic     "t"
                             :value     "v1"}]}}

To make working with the output channel easier, this
transducer morphs these messages into a list of
distinct records:

    ({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
     {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
     ...)
sourceraw docstring

safe-poll!clj

(safe-poll! consumer timeout)

Implementation of poll which disregards wake-up exceptions

Implementation of poll which disregards wake-up exceptions
sourceraw docstring

serializerclj

(serializer f)

Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.

Yield an instance of a serializer from a function of two arguments:
a topic and the payload to serialize.
sourceraw docstring

serializersclj

source

string-deserializerclj

(string-deserializer)

Kafka's own string deserializer

Kafka's own string deserializer
sourceraw docstring

string-serializerclj

(string-serializer)

Kafka's own string serializer.

Kafka's own string serializer.
sourceraw docstring

topic-partition->dataclj

(topic-partition->data tp)

Yield a clojure representation of a topic-partition

Yield a clojure representation of a topic-partition
sourceraw docstring

value-meta-xformclj

A transducer which acts like record-xform but directly yields values, with record metadata stored in the map metadata

A transducer which acts like `record-xform` but directly yields values,
with record metadata stored in the map metadata
sourceraw docstring

value-xformclj

A transducer which acts like record-xform but directly yields values, losing metadata

A transducer which acts like `record-xform` but directly yields values,
losing metadata
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close