Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
(->offset-metadata {:keys [offset metadata]})
Yield a OffsetAndMetadata from a clojure map.
Yield a OffsetAndMetadata from a clojure map.
(->record payload)
Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
(->topic-partition {:keys [topic partition]})
Yield a TopicPartition from a clojure map.
Yield a TopicPartition from a clojure map.
(->topics topics)
Yield a valid object for subscription
Yield a valid object for subscription
(consumer config)
(consumer config callback)
(consumer config kdeserializer vdeserializer)
(consumer config callback kdeserializer vdeserializer)
Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
(consumer->driver consumer)
(consumer->driver consumer run-signal)
Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.
The consumer driver implements the following protocols:
clojure.lang.IDeref
: deref
to access underlying
KafkaConsumer
instance.Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver. The consumer driver implements the following protocols: - [ConsumerDriver](#var-ConsumerDriver) - [MetadataDriver](#var-MetadataDriver) - `clojure.lang.IDeref`: `deref` to access underlying [KafkaConsumer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaConsumer.html) instance.
(consumer-records->data crs)
Yield the clojure representation of topic
Yield the clojure representation of topic
Driver interface for consumers
Driver interface for consumers
(subscribe! this topics)
(subscribe! this topics listener)
Subscribe to a topic or list of topics. The topics argument can be:
The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.
When a function is supplied, it will be called on relance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.
Subscribe to a topic or list of topics. The topics argument can be: - A simple string when subscribing to a single topic - A regex pattern to subscribe to matching topics - A sequence of strings The optional listener argument is either a callback function or an implementation of [ConsumerRebalanceListener](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html). When a function is supplied, it will be called on relance events with a map representing the event, see [kinsky.client/rebalance-listener](#var-rebalance-listener) for details on the map format.
(subscription this)
Currently assigned topics
Currently assigned topics
(resume! this topic-partitions)
Resume consumption.
Resume consumption.
(wake-up! this)
Safely wake-up a consumer which may be blocking during polling.
Safely wake-up a consumer which may be blocking during polling.
(commit! this)
(commit! this topic-offsets)
Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:
{:topic topic
:partition partition
:offset offset
:metadata metadata}
The topic and partition tuple must be unique across the whole list.
Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form: ``` {:topic topic :partition partition :offset offset :metadata metadata} ``` The topic and partition tuple must be unique across the whole list.
(position! this)
(position! this topic-partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
Get the offset of the next record that will be fetched (if a record with that offset exists).
(unsubscribe! this)
Unsubscribe from currently subscribed topics.
Unsubscribe from currently subscribed topics.
(stop! this)
(stop! this timeout)
Stop consumption.
Stop consumption.
(seek! this)
(seek! this topic-partition offset)
Overrides the fetch offsets that the consumer will use on the next poll
Overrides the fetch offsets that the consumer will use on the next poll
(pause! this)
(pause! this topic-partitions)
Pause consumption.
Pause consumption.
(poll! this timeout)
Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.
{:partitions [["t" 0] ["t" 1]]
:topics ["t"]
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance. {:partitions [["t" 0] ["t" 1]] :topics ["t"] :count 2 :by-partition {["t" 0] [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}] ["t" 1] [{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]} :by-topic {"t" [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"} {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]}}
(cr->data cr)
Yield a clojure representation of a consumer record
Yield a clojure representation of a consumer record
(deserializer f)
Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
(edn-deserializer)
(edn-deserializer reader-opts)
Deserialize EDN.
Deserialize EDN.
(close! this)
(close! this timeout)
Close this driver
Close this driver
(json-serializer)
Serialize as JSON through cheshire.
Serialize as JSON through cheshire.
(keyword-deserializer)
Deserialize a string and then keywordize it.
Deserialize a string and then keywordize it.
(keyword-serializer)
Serialize keywords to strings, useful for keys.
Serialize keywords to strings, useful for keys.
Common properties for all drivers
Common properties for all drivers
(partitions-for this topic)
Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:
{:topic "t"
:partition 0
:isr [{:host "x" :id 0 :port 9092}]
:leader {:host "x" :id 0 :port 9092}
:replicas [{:host "x" :id 0 :port 9092}]
Retrieve partition ownership information for a topic. The result is a data representation of a [PartitionInfo](http://kafka.apache.org/090/javadoc/org/apache/kafka/common/PartitionInfo.html) list. The structure for a partition info map is: {:topic "t" :partition 0 :isr [{:host "x" :id 0 :port 9092}] :leader {:host "x" :id 0 :port 9092} :replicas [{:host "x" :id 0 :port 9092}]
(node->data n)
Yield a clojure representation of a node.
Yield a clojure representation of a node.
(opts->props opts)
Kakfa configs are now maps of strings to strings. Morph an arbitrary clojure map into this representation.
Kakfa configs are now maps of strings to strings. Morph an arbitrary clojure map into this representation.
(partition-info->data pi)
Yield a clojure representation of a partition-info.
Yield a clojure representation of a partition-info.
(producer config)
(producer config serializer)
(producer config kserializer vserializer)
Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
(producer->driver producer)
Yield a driver from a Kafka Producer. The producer driver implements the following protocols:
clojure.lang.IDeref
: deref
to access underlying
KafkaProducer
instance.Yield a driver from a Kafka Producer. The producer driver implements the following protocols: - [ProducerDriver](#var-ProducerDriver) - [MetadataDriver](#var-MetadataDriver) - `clojure.lang.IDeref`: `deref` to access underlying [KafkaProducer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html) instance.
Driver interface for producers
Driver interface for producers
(flush! this)
Ensure that produced messages are flushed.
Ensure that produced messages are flushed.
(send! this record)
(send! this topic k v)
Produce a record on a topic.
When using the single arity version, a map
with the following possible keys is expected:
:key
, :topic
, :partition
, and :value
.
Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: `:key`, `:topic`, `:partition`, and `:value`.
(rebalance-listener callback)
Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
A transducer to explode grouped records into individual entities.
When sucessful, the output of kinsky.client/poll! takes the form:
{:partitions [["t" 0] ["t" 1]]
:topics #{"t"}
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:
({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
...)
A transducer to explode grouped records into individual entities. When sucessful, the output of kinsky.client/poll! takes the form: {:partitions [["t" 0] ["t" 1]] :topics #{"t"} :count 2 :by-partition {["t" 0] [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}] ["t" 1] [{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]} :by-topic {"t" [{:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"} {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}]}} To make working with the output channel easier, this transducer morphs these messages into a list of distinct records: ({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"} {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"} ...)
(safe-poll! consumer timeout)
Implementation of poll which disregards wake-up exceptions
Implementation of poll which disregards wake-up exceptions
(serializer f)
Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
(string-deserializer)
Kafka's own string deserializer
Kafka's own string deserializer
(string-serializer)
Kafka's own string serializer.
Kafka's own string serializer.
(topic-partition->data tp)
Yield a clojure representation of a topic-partition
Yield a clojure representation of a topic-partition
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close