Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
(->header [k v])
(->header k v)
Build a Kafka header from a key and a value
Build a Kafka header from a key and a value
(->headers headers)
Build Kafka headers from a clojure map.
Build Kafka headers from a clojure map.
(->offset-metadata {:keys [offset metadata]})
Yield a OffsetAndMetadata from a clojure map.
Yield a OffsetAndMetadata from a clojure map.
(->record payload)
Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
(->topic-partition {:keys [topic partition]})
Yield a TopicPartition from a clojure map.
Yield a TopicPartition from a clojure map.
(->topics topics)
Yield a valid object for subscription
Yield a valid object for subscription
(consumer config)
(consumer config callback)
(consumer config kdeserializer vdeserializer)
(consumer config callback kdeserializer vdeserializer)
Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
(consumer->driver consumer)
(consumer->driver consumer
{:kinsky.client/keys [run-fn consumer-decoder-fn]
:or {consumer-decoder-fn consumer-records->data}})
Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.
The consumer driver implements the following protocols:
clojure.lang.IDeref
: deref
to access underlying
KafkaConsumer
instance.The consumer-driver can also take options:
consumer-decoder-fn
: a function that will potentially transform
the ConsumerRecords returned by kafka. By default it will use
consumer-records->data
Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver. The consumer driver implements the following protocols: - [ConsumerDriver](#var-ConsumerDriver) - [MetadataDriver](#var-MetadataDriver) - `clojure.lang.IDeref`: `deref` to access underlying [KafkaConsumer](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html) instance. The consumer-driver can also take options: * `consumer-decoder-fn`: a function that will potentially transform the ConsumerRecords returned by kafka. By default it will use `consumer-records->data`
(consumer-records->data crs)
Yield the default Clojure representation of topic
Yield the default Clojure representation of topic
Driver interface for consumers
Driver interface for consumers
(subscribe! this topics)
(subscribe! this topics listener)
Subscribe to a topic or list of topics. The topics argument can be:
The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.
When a function is supplied, it will be called on rebalance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.
Subscribe to a topic or list of topics. The topics argument can be: - A simple string when subscribing to a single topic - A regex pattern to subscribe to matching topics - A sequence of strings The optional listener argument is either a callback function or an implementation of [ConsumerRebalanceListener](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html). When a function is supplied, it will be called on rebalance events with a map representing the event, see [kinsky.client/rebalance-listener](#var-rebalance-listener) for details on the map format.
(subscription this)
Currently assigned topics
Currently assigned topics
(resume! this topic-partitions)
Resume consumption.
Resume consumption.
(wake-up! this)
Safely wake-up a consumer which may be blocking during polling.
Safely wake-up a consumer which may be blocking during polling.
(commit! this)
(commit! this topic-offsets)
Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:
{:topic topic
:partition partition
:offset offset
:metadata metadata}
The topic and partition tuple must be unique across the whole list.
Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form: ``` {:topic topic :partition partition :offset offset :metadata metadata} ``` The topic and partition tuple must be unique across the whole list.
(position! this)
(position! this topic-partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
Get the offset of the next record that will be fetched (if a record with that offset exists).
(unsubscribe! this)
Unsubscribe from currently subscribed topics.
Unsubscribe from currently subscribed topics.
(stop! this)
(stop! this timeout)
Stop consumption.
Stop consumption.
(seek! this)
(seek! this topic-partition offset)
Overrides the fetch offsets that the consumer will use on the next poll
Overrides the fetch offsets that the consumer will use on the next poll
(pause! this)
(pause! this topic-partitions)
Pause consumption.
Pause consumption.
(poll! this timeout)
Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.
{:partitions [["t" 0] ["t" 1]]
:topics ["t"]
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:timestamp 1233524522
:topic "t"
:value "v0"
:headers {:h1 ["1" "2"]}}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:timestamp 1233524527
:topic "t"
:value "v1"}
:headers {:h1 ["1"]}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:timestamp 1233524522
:topic "t"
:value "v0"
:headers {:h1 ["1" "2"]}}
{:key "k1"
:offset 1
:partition 1
:timestamp 1233524527
:topic "t"
:value "v1"
:headers {:h1 ["1"]}}]}}
Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance. {:partitions [["t" 0] ["t" 1]] :topics ["t"] :count 2 :by-partition {["t" 0] [{:key "k0" :offset 1 :partition 0 :timestamp 1233524522 :topic "t" :value "v0" :headers {:h1 ["1" "2"]}}] ["t" 1] [{:key "k1" :offset 1 :partition 1 :timestamp 1233524527 :topic "t" :value "v1"} :headers {:h1 ["1"]}]} :by-topic {"t" [{:key "k0" :offset 1 :partition 0 :timestamp 1233524522 :topic "t" :value "v0" :headers {:h1 ["1" "2"]}} {:key "k1" :offset 1 :partition 1 :timestamp 1233524527 :topic "t" :value "v1" :headers {:h1 ["1"]}}]}}
(cr->data cr)
Yield a clojure representation of a consumer record
Yield a clojure representation of a consumer record
(crs->eduction crs)
Returns consumer records as clojure.lang.Eduction to be used in potential reducible context
Returns consumer records as clojure.lang.Eduction to be used in potential reducible context
(crs-for-topic+partition->eduction crs topic partition)
Returns an Eduction for records for TopicPartition
Returns an Eduction for records for TopicPartition
(crs-for-topic->eduction crs topic)
Returns an Eduction for records by topics
Returns an Eduction for records by topics
(deserializer f)
Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
(edn-deserializer)
(edn-deserializer reader-opts)
Deserialize EDN.
Deserialize EDN.
(close! this)
(close! this timeout)
Close this driver
Close this driver
(headers->map headers)
Build a clojure map out of Kafka Headers from the returned RecordHeaders object. As the Kafka Headers can have duplicates, the map returned merges any such cases: {:h1 ["123" "456"] :h2 ["Hello"]}
Build a clojure map out of Kafka Headers from the returned RecordHeaders object. As the Kafka Headers can have duplicates, the map returned merges any such cases: {:h1 ["123" "456"] :h2 ["Hello"]}
(json-serializer)
Serialize as JSON through jsonista.
Serialize as JSON through jsonista.
(keyword-deserializer)
Deserialize a string and then keywordize it.
Deserialize a string and then keywordize it.
(keyword-serializer)
Serialize keywords to strings, useful for keys.
Serialize keywords to strings, useful for keys.
Common properties for all drivers
Common properties for all drivers
(partitions-for this topic)
Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:
{:topic "t"
:partition 0
:isr [{:host "x" :id 0 :port 9092}]
:leader {:host "x" :id 0 :port 9092}
:replicas [{:host "x" :id 0 :port 9092}]
Retrieve partition ownership information for a topic. The result is a data representation of a [PartitionInfo](https://kafka.apache.org/24/javadoc/org/apache/kafka/common/PartitionInfo.html) list. The structure for a partition info map is: {:topic "t" :partition 0 :isr [{:host "x" :id 0 :port 9092}] :leader {:host "x" :id 0 :port 9092} :replicas [{:host "x" :id 0 :port 9092}]
(node->data n)
Yield a clojure representation of a node.
Yield a clojure representation of a node.
(opts->props opts)
Kakfa configs are now maps of strings to strings. Morphs an arbitrary clojure map into this representation. Make sure we don't pass options that are meant for the driver to concrete Consumers/Producers
Kakfa configs are now maps of strings to strings. Morphs an arbitrary clojure map into this representation. Make sure we don't pass options that are meant for the driver to concrete Consumers/Producers
(partition-info->data pi)
Yield a clojure representation of a partition-info.
Yield a clojure representation of a partition-info.
(producer config)
(producer config serializer)
(producer config kserializer vserializer)
Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
(producer->driver producer)
Yield a driver from a Kafka Producer. The producer driver implements the following protocols:
clojure.lang.IDeref
: deref
to access underlying
KafkaProducer
instance.Yield a driver from a Kafka Producer. The producer driver implements the following protocols: - [ProducerDriver](#var-ProducerDriver) - [MetadataDriver](#var-MetadataDriver) - `clojure.lang.IDeref`: `deref` to access underlying [KafkaProducer](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html) instance.
Driver interface for producers
Driver interface for producers
(begin-transaction! this)
(commit-transaction! this)
(flush! this)
Ensure that produced messages are flushed.
Ensure that produced messages are flushed.
(init-transactions! this)
(send! this record)
(send! this topic k v)
(send! this topic k v headers)
Produce a record on a topic.
When using the single arity version, a map
with the following possible keys is expected:
:key
, :topic
, :partition
, :headers
, :timestamp
and :value
.
Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: `:key`, `:topic`, `:partition`, `:headers`, `:timestamp` and `:value`.
(rebalance-listener callback)
Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
A transducer to explode grouped records into individual entities.
When successful, the output of kinsky.client/poll! takes the form:
{:partitions [["t" 0] ["t" 1]]
:topics #{"t"}
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:
({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
...)
A transducer to explode grouped records into individual entities. When successful, the output of kinsky.client/poll! takes the form: {:partitions [["t" 0] ["t" 1]] :topics #{"t"} :count 2 :by-partition {["t" 0] [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}] ["t" 1] [{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]} :by-topic {"t" [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"} {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]}} To make working with the output channel easier, this transducer morphs these messages into a list of distinct records: ({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"} {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"} ...)
(safe-poll! consumer timeout)
Implementation of poll which disregards wake-up exceptions
Implementation of poll which disregards wake-up exceptions
(serializer f)
Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
(string-deserializer)
Kafka's own string deserializer
Kafka's own string deserializer
(string-serializer)
Kafka's own string serializer.
Kafka's own string serializer.
(topic-partition->data tp)
Yield a clojure representation of a topic-partition
Yield a clojure representation of a topic-partition
A transducer which acts like record-xform
but directly yields values,
with record metadata stored in the map metadata
A transducer which acts like `record-xform` but directly yields values, with record metadata stored in the map metadata
A transducer which acts like record-xform
but directly yields values,
losing metadata
A transducer which acts like `record-xform` but directly yields values, losing metadata
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close