Liking cljdoc? Tell your friends :D

kafka-async.consumer


closeclj

(close consumer-id)

Closes a Kafka Consumer and the respective core-async channels given the consumer id obtained in create!.

Returns nil if there is no consumer with the given id or a sequence of all consumers otherwise

Closes a Kafka Consumer and the respective core-async channels given the consumer id obtained in `create!`.

Returns `nil` if there is no consumer with the given id or a sequence of all consumers otherwise
sourceraw docstring

consumer-defaultsclj

source

consumersclj

CLojure atom containing all the registered consumers in Kafka-Async

CLojure `atom` containing all the registered consumers in Kafka-Async
sourceraw docstring

create!clj

(create! servers group-id topics)
(create! servers group-id topics options)

Creates a Kafka Consumer client with a core.async interface given the broker's list and group id.

After the Java Kafka consumer is created it's saved in the consumers atom with the following format:

{:uuid {:chan core-async-input-output-channel :commit-chan core-async-commit-chnannel :consumer java-kafka-consumer}}

A core.async process is created that polls Kafka for messages and sends them to the output channel.

Clients must manually send a message to the commit-chan in order to continue receiving messages by incrementing the consumer's offset.

This function returns the following map to the client

{:out-chan out-chan :commit-chan commit-chan :consumer-id consumer-id}

Usage example:

(let [{out-chan :out-chan commit-chan :commit-chan} (kafka-async-consumer/create! "localhost:9092" "some-group-id" ["topic1"])]
   (go-loop []
      (some-processor-fn (<! out-chan))
      (>! commit-chan :kafka-commit)
      (recur)))
Creates a Kafka Consumer client with a core.async interface given the broker's list and group id.

After the Java Kafka consumer is created it's saved in the `consumers` atom with the following format:

```clojure
{:uuid {:chan core-async-input-output-channel :commit-chan core-async-commit-chnannel :consumer java-kafka-consumer}}
```

A core.async process is created that polls Kafka for messages and sends them to the output channel.

Clients must manually send a message to the commit-chan in order to continue receiving messages
by incrementing the consumer's offset.

This function returns the following map to the client

```clojure
{:out-chan out-chan :commit-chan commit-chan :consumer-id consumer-id}
```

Usage example:

```clojure
(let [{out-chan :out-chan commit-chan :commit-chan} (kafka-async-consumer/create! "localhost:9092" "some-group-id" ["topic1"])]
   (go-loop []
      (some-processor-fn (<! out-chan))
      (>! commit-chan :kafka-commit)
      (recur)))
```
sourceraw docstring

records-by-topicclj

(records-by-topic records)

Higher order function that receives a set of consumer records and returns a function that expects a topic.

The purpose of this returned function is to map each record in the set to a the topic passed as argument.

The follwing data structure is returned:

{:topic ({:timestamp xxx :message "some kafka message"})}
Higher order function that receives a set of consumer records and returns a function that expects a topic.

The purpose of this returned function is to map each record in the set to a the topic passed as argument.

The follwing data structure is returned:

```clojure
{:topic ({:timestamp xxx :message "some kafka message"})}
```
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close