(close consumer-id)
Closes a Kafka Consumer and the respective core-async channels given the consumer id obtained in create!
.
Returns nil
if there is no consumer with the given id or a sequence of all consumers otherwise
Closes a Kafka Consumer and the respective core-async channels given the consumer id obtained in `create!`. Returns `nil` if there is no consumer with the given id or a sequence of all consumers otherwise
CLojure atom
containing all the registered consumers in Kafka-Async
CLojure `atom` containing all the registered consumers in Kafka-Async
(create! servers group-id topics)
(create! servers group-id topics options)
Creates a Kafka Consumer client with a core.async interface given the broker's list and group id.
After the Java Kafka consumer is created it's saved in the consumers
atom with the following format:
{:uuid {:chan core-async-input-output-channel :commit-chan core-async-commit-chnannel :consumer java-kafka-consumer}}
A core.async process is created that polls Kafka for messages and sends them to the output channel.
Clients must manually send a message to the commit-chan in order to continue receiving messages by incrementing the consumer's offset.
This function returns the following map to the client
{:out-chan out-chan :commit-chan commit-chan :consumer-id consumer-id}
Usage example:
(let [{out-chan :out-chan commit-chan :commit-chan} (kafka-async-consumer/create! "localhost:9092" "some-group-id" ["topic1"])]
(go-loop []
(some-processor-fn (<! out-chan))
(>! commit-chan :kafka-commit)
(recur)))
Creates a Kafka Consumer client with a core.async interface given the broker's list and group id. After the Java Kafka consumer is created it's saved in the `consumers` atom with the following format: ```clojure {:uuid {:chan core-async-input-output-channel :commit-chan core-async-commit-chnannel :consumer java-kafka-consumer}} ``` A core.async process is created that polls Kafka for messages and sends them to the output channel. Clients must manually send a message to the commit-chan in order to continue receiving messages by incrementing the consumer's offset. This function returns the following map to the client ```clojure {:out-chan out-chan :commit-chan commit-chan :consumer-id consumer-id} ``` Usage example: ```clojure (let [{out-chan :out-chan commit-chan :commit-chan} (kafka-async-consumer/create! "localhost:9092" "some-group-id" ["topic1"])] (go-loop [] (some-processor-fn (<! out-chan)) (>! commit-chan :kafka-commit) (recur))) ```
(records-by-topic records)
Higher order function that receives a set of consumer records and returns a function that expects a topic.
The purpose of this returned function is to map each record in the set to a the topic passed as argument.
The follwing data structure is returned:
{:topic ({:timestamp xxx :message "some kafka message"})}
Higher order function that receives a set of consumer records and returns a function that expects a topic. The purpose of this returned function is to map each record in the set to a the topic passed as argument. The follwing data structure is returned: ```clojure {:topic ({:timestamp xxx :message "some kafka message"})} ```
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close