Clojure wrapper to Kafka's consumer and producer APIs.
The consumers and producers are the basis for streams, and many other use cases. They can be used to send messages directly to, or read messages from topics. There are also some facilities for polling, and transactions.
See jackdaw.client.* for some add-ons atop this API.
Clojure wrapper to Kafka's consumer and producer APIs. The consumers and producers are the basis for streams, and many other use cases. They can be used to send messages directly to, or read messages from topics. There are also some facilities for polling, and transactions. See `jackdaw.client.*` for some add-ons atop this API.
(assign consumer & topic-partitions)Assign a consumer to specific partitions for specific topics. Returns the consumer.
Assign a consumer to specific partitions for specific topics. Returns the consumer.
(assign-all consumer topics)Assigns all of the partitions for all of the given topics to the consumer
Assigns all of the partitions for all of the given topics to the consumer
(assignment consumer)Return the assigned topics and partitions of a consumer.
Return the assigned topics and partitions of a consumer.
(callback on-completion)Return a kafka Callback function out of a clojure fn.
The fn must be of 2-arity, being [record-metadata?, ex?] where the
record-metadata may be the datafied metadata for the produced
record, and the ex may be an exception encountered while producing
the record.
Callbacks are void, so the return value is ignored.
Return a kafka `Callback` function out of a clojure `fn`. The fn must be of 2-arity, being `[record-metadata?, ex?]` where the record-metadata may be the datafied metadata for the produced record, and the ex may be an exception encountered while producing the record. Callbacks are `void`, so the return value is ignored.
(consumer config)(consumer config {:keys [key-serde value-serde] :as t})Return a consumer with the supplied properties and optional Serdes.
Return a consumer with the supplied properties and optional Serdes.
(num-partitions producer-or-consumer topic)Given a producer or consumer and a topic, return the number of partitions for that topic.
Note that partitions are 0-indexed, so a number of partitions 1 means that only partition 0 exists.
Given a producer or consumer and a topic, return the number of partitions for that topic. Note that partitions are 0-indexed, so a number of partitions 1 means that only partition 0 exists.
(offsets-for-times consumer partition-timestamps)Given a subscribed consumer and a mapping of topic-partition or
TopicPartition records to timestamps, return a mapping from
topic-partition descriptors to the offset into each partition of the
FIRST record whose timestamp is equal to or greater than the given
timestamp.
Timestamps are longs to MS precision in UTC.
Given a subscribed consumer and a mapping of topic-partition or `TopicPartition` records to timestamps, return a mapping from topic-partition descriptors to the offset into each partition of the FIRST record whose timestamp is equal to or greater than the given timestamp. Timestamps are longs to MS precision in UTC.
(partitions-for producer-or-consumer {:keys [topic-name]})Given a producer or consumer and a Jackdaw topic descriptor, return metadata about the partitions assigned to the given consumer or producer.
Given a producer or consumer and a Jackdaw topic descriptor, return metadata about the partitions assigned to the given consumer or producer.
(poll consumer timeout)Polls kafka for new messages, returning a potentially empty sequence
of datafied messages. timeout is a java.time.Duration or a number of
milliseconds.
Polls kafka for new messages, returning a potentially empty sequence of datafied messages. `timeout` is a `java.time.Duration` or a number of milliseconds.
(poll-for-assignments consumer)(poll-for-assignments consumer timeout-ms)Poll the consumer until it has a non-empty partition assignment, bounded by
timeout-ms (default 10s). Kafka 4.x's group rebalance does not complete
within a single 0ms poll, so callers that need an assignment must keep
polling. Returns the consumer.
Poll the consumer until it has a non-empty partition assignment, bounded by `timeout-ms` (default 10s). Kafka 4.x's group rebalance does not complete within a single 0ms poll, so callers that need an assignment must keep polling. Returns the consumer.
(position consumer topic-partition)Get the offset of the next record that will be fetched.
Accepts either a TopicPartition record, or a datafied
TopicPartition as generated by the rest of the Jackdaw API.
Get the offset of the next record that will be fetched. Accepts either a `TopicPartition` record, or a datafied `TopicPartition` as generated by the rest of the Jackdaw API.
(position-all consumer)Call position on every assigned partition, producing a map from partition information to the consumer's offset into that partition.
Call position on every assigned partition, producing a map from partition information to the consumer's offset into that partition.
(produce! producer topic value)(produce! producer topic key value)(produce! producer topic partition key value)(produce! producer topic partition timestamp key value)(produce! producer topic partition timestamp key value headers)Helper wrapping #'send!.
Builds and sends a ProducerRecord so you don't have to. Returns
a future which will produce datafied record metadata when forced.
Helper wrapping `#'send!`. Builds and sends a `ProducerRecord` so you don't have to. Returns a future which will produce datafied record metadata when forced.
(producer config)(producer config {:keys [key-serde value-serde]})Return a producer with the supplied properties and optional Serdes.
Return a producer with the supplied properties and optional Serdes.
(seek consumer topic-partition offset)Seek the consumer to the specified offset on the specified partition.
Accepts either a TopicPartition instance or a datafied
TopicPartition as produced by the rest of the Jackdaw API.
Returns the consumer for convenience with ->, doto etc.
Seek the consumer to the specified offset on the specified partition. Accepts either a `TopicPartition` instance or a datafied `TopicPartition` as produced by the rest of the Jackdaw API. Returns the consumer for convenience with `->`, `doto` etc.
(seek-to-beginning-eager consumer)(seek-to-beginning-eager consumer topic-partitions)Seek to the first offset for the given topic/partitions and force positioning.
When no partitions are passed, seek on all assigned topic-partitions.
Seek to the first offset for the given topic/partitions and force positioning. When no partitions are passed, seek on all assigned topic-partitions.
(seek-to-end-eager consumer)(seek-to-end-eager consumer topic-partitions)Seek to the last offset for all assigned partitions, and force positioning.
When no partitions are passed, seek on all assigned partitions.
Returns the consumer.
Seek to the last offset for all assigned partitions, and force positioning. When no partitions are passed, seek on all assigned partitions. Returns the consumer.
(seek-to-timestamp consumer timestamp topics)Given an timestamp in epoch MS, a subscribed consumer and a seq of Jackdaw topics, seek all partitions of the selected topics to the offsets reported by Kafka to correlate with the given timestamp.
After seeking, the first message read from each partition will be the EARLIEST message whose timestamp is greater than or equal to the timestamp sought.
If no such message exists, the first message read from each partition will be the next new message written to that partition.
Returns the consumer for convenience with ->, doto etc.
Given an timestamp in epoch MS, a subscribed consumer and a seq of Jackdaw topics, seek all partitions of the selected topics to the offsets reported by Kafka to correlate with the given timestamp. After seeking, the first message read from each partition will be the EARLIEST message whose timestamp is greater than or equal to the timestamp sought. If no such message exists, the first message read from each partition will be the next new message written to that partition. Returns the consumer for convenience with `->`, `doto` etc.
(send! producer record)(send! producer record callback-fn)Asynchronously sends a record to a topic, returning a Future
which will produce a data structure describing the metadata of the
produced record when forced.
A 2-arity callback function may be provided. It will be invoked with either [RecordMetdata, nil] or [nil, Exception] respectively if the record was sent or if an exception was encountered.
Asynchronously sends a record to a topic, returning a `Future` which will produce a data structure describing the metadata of the produced record when forced. A 2-arity callback function may be provided. It will be invoked with either [RecordMetdata, nil] or [nil, Exception] respectively if the record was sent or if an exception was encountered.
(subscribe consumer topic-configs)Subscribe a consumer to the specified topics.
Returns the consumer.
Subscribe a consumer to the specified topics. Returns the consumer.
(subscribed-consumer config topic-configs)Given a broker configuration and topics, returns a consumer that is subscribed to all of the given topic descriptors.
WARNING: All topics subscribed to by a single consumer must share a single pair of key and value serde instances. The serdes of the first requested topic are used, and all other topics are expected to be able to use same serdes.
Given a broker configuration and topics, returns a consumer that is subscribed to all of the given topic descriptors. WARNING: All topics subscribed to by a single consumer must share a single pair of key and value serde instances. The serdes of the first requested topic are used, and all other topics are expected to be able to use same serdes.
(subscription consumer)Return the subscription(s) of a consumer as a collection of topics.
Subscriptions are a set of strings, being the names of topics which are subscribed to.
Return the subscription(s) of a consumer as a collection of topics. Subscriptions are a set of strings, being the names of topics which are subscribed to.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |