API namespace for the library
API namespace for the library
(consumer nodes group-id topics)(consumer nodes group-id topics {:as conf})Returns a manifold source of kafka consumer.
nodes: bootstrap servers url, e.g localhost:9092group-id: consumer group id.topics: a sequence of topics to listen on. e.g. `["test"]conf: an optional config map.
other conf optionsReturns a manifold source of kafka consumer. - `nodes`: bootstrap servers url, e.g `localhost:9092` - `group-id`: consumer group id. - `topics`: a sequence of topics to listen on. e.g. `["test"] - `conf`: an optional config map. [other conf options](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html)
(ignore strm)ignore a stream strm's output, return strm itself.
ignore a stream `strm`'s output, return `strm` itself.
(kafka-cluster kafka-conf-map)returns a convient life-cycle-map of a kafka cluster. It requires:
::nodes kafka bootstrap servers::shapes shape of data, example: [(shape/topic (constantly "sentence"))(shape/edn)(shape/value-only)]If you just use it to publish message,
::producer-config can specify additional kafka producer configuration.If you want to consume from topics:
::topics the topic you want to subscribe to. example: ["sentence"]::group-id the group id for the message consumer::source-xform is a transducer to process message before consuming::consumer-config can specify additional kafka consumer configuration. With additions:
:position either :beginning :end, none for commited position (default):poll-duration for how long the consumer poll returns, is a Duration value, default 10 secondsThe returned map has different level of key-values let you use:
::consume a function, a one-arity (each message) function as its arg, returns nil.::put! a function with a message as its arg. e.g. ((::put return-map) {:a 3})
-::put-all! a function with message sequence as its arg::source a manifold source.::sink a manifold sink.::consumer a Kafka consumer.::producer a Kafka message producer.returns a convient life-cycle-map of a kafka cluster. It requires:
- `::nodes` kafka bootstrap servers
- `::shapes` shape of data, example: `[(shape/topic (constantly "sentence"))(shape/edn)(shape/value-only)]`
If you just use it to publish message,
- optional `::producer-config` can specify additional kafka producer configuration.
If you want to consume from topics:
- `::topics` the topic you want to subscribe to. example: `["sentence"]`
- `::group-id` the group id for the message consumer
- optional `::source-xform` is a transducer to process message before consuming
- optional `::consumer-config` can specify additional kafka consumer configuration. With additions:
- `:position` either `:beginning` `:end`, none for commited position (default)
- `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds
The returned map has different level of key-values let you use:
- Highest level, no additional knowledge:
- For consumer: `::consume` a function, a one-arity (each message) function as its arg, returns `nil`.
- For producer:
-`::put!` a function with a message as its arg. e.g. `((::put return-map) {:a 3})`
-`::put-all!` a function with message sequence as its arg
- Mid level, if you need access of underlying manifold sink/source.
- `::source` a manifold source.
- `::sink` a manifold sink.
- Lowest level, if you want to access kafka directly:
- `::consumer` a Kafka consumer.
- `::producer` a Kafka message producer.
(producer nodes)(producer nodes {:as conf})Returns a manifold stream of kafka producer. Can accept map value put onto it,
and output the putting result. If your do not care about the output, wrap it with ignore.
nodes: bootstrap servers urls, e.g. localhost:9092conf: optional config conf.
other conf optionsReturns a manifold stream of kafka producer. Can accept map value put onto it, and output the putting result. If your do not care about the output, wrap it with `ignore`. - `nodes`: bootstrap servers urls, e.g. `localhost:9092` - `conf`: optional config `conf`. [other conf options](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html)
(xform-sink sink xform)returns a sink stream on sink stream, all event put on it will be transformed using
transducer xform before pass to sink.
sink: sink stream.xform: a transducer transforms events, then put to sinkreturns a sink stream on `sink` stream, all event put on it will be transformed using transducer `xform` before pass to `sink`. - `sink`: sink stream. - `xform`: a transducer transforms events, then put to `sink`
(xform-source src xform)returns a source stream on src transforming event from src using transducer xform.
will close source if this is drained.
src: source stream.xform: a transducer for transforming data from srcreturns a source stream on `src` transforming event from `src` using transducer `xform`. will close source if this is drained. - `src`: source stream. - `xform`: a transducer for transforming data from `src`
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |