API namespace for the library
API namespace for the library
(consumer nodes group-id topics)
(consumer nodes group-id topics {:as conf})
Returns a manifold source of kafka consumer.
nodes
: bootstrap servers url, e.g localhost:9092
group-id
: consumer group id.topics
: a sequence of topics to listen on. e.g. `["test"]conf
: an optional config map.
other conf optionsReturns a manifold source of kafka consumer. - `nodes`: bootstrap servers url, e.g `localhost:9092` - `group-id`: consumer group id. - `topics`: a sequence of topics to listen on. e.g. `["test"] - `conf`: an optional config map. [other conf options](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html)
(ignore strm)
ignore a stream strm
's output, return strm
itself.
ignore a stream `strm`'s output, return `strm` itself.
(kafka-cluster kafka-conf-map)
returns a convient life-cycle-map of a kafka cluster. It requires:
::nodes
kafka bootstrap servers::shapes
shape of data, example: [(shape/topic (constantly "sentence"))(shape/edn)(shape/value-only)]
If you just use it to publish message,
::producer-config
can specify additional kafka producer configuration.If you want to consume from topics:
::topics
the topic you want to subscribe to. example: ["sentence"]
::group-id
the group id for the message consumer::source-xform
is a transducer to process message before consuming::consumer-config
can specify additional kafka consumer configuration. With additions:
:position
either :beginning
:end
, none for commited position (default):poll-duration
for how long the consumer poll returns, is a Duration value, default 10 secondsThe returned map has different level of key-values let you use:
::consume
a function, a one-arity (each message) function as its arg, returns nil
.::put!
a function with a message as its arg. e.g. ((::put return-map) {:a 3})
-::put-all!
a function with message sequence as its arg::source
a manifold source.::sink
a manifold sink.::consumer
a Kafka consumer.::producer
a Kafka message producer.returns a convient life-cycle-map of a kafka cluster. It requires: - `::nodes` kafka bootstrap servers - `::shapes` shape of data, example: `[(shape/topic (constantly "sentence"))(shape/edn)(shape/value-only)]` If you just use it to publish message, - optional `::producer-config` can specify additional kafka producer configuration. If you want to consume from topics: - `::topics` the topic you want to subscribe to. example: `["sentence"]` - `::group-id` the group id for the message consumer - optional `::source-xform` is a transducer to process message before consuming - optional `::consumer-config` can specify additional kafka consumer configuration. With additions: - `:position` either `:beginning` `:end`, none for commited position (default) - `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds The returned map has different level of key-values let you use: - Highest level, no additional knowledge: - For consumer: `::consume` a function, a one-arity (each message) function as its arg, returns `nil`. - For producer: -`::put!` a function with a message as its arg. e.g. `((::put return-map) {:a 3})` -`::put-all!` a function with message sequence as its arg - Mid level, if you need access of underlying manifold sink/source. - `::source` a manifold source. - `::sink` a manifold sink. - Lowest level, if you want to access kafka directly: - `::consumer` a Kafka consumer. - `::producer` a Kafka message producer.
(producer nodes)
(producer nodes {:as conf})
Returns a manifold stream of kafka producer. Can accept map value put onto it,
and output the putting result. If your do not care about the output, wrap it with ignore
.
nodes
: bootstrap servers urls, e.g. localhost:9092
conf
: optional config conf
.
other conf optionsReturns a manifold stream of kafka producer. Can accept map value put onto it, and output the putting result. If your do not care about the output, wrap it with `ignore`. - `nodes`: bootstrap servers urls, e.g. `localhost:9092` - `conf`: optional config `conf`. [other conf options](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html)
(xform-sink sink xform)
returns a sink stream on sink
stream, all event put on it will be transformed using
transducer xform
before pass to sink
.
sink
: sink stream.xform
: a transducer transforms events, then put to sink
returns a sink stream on `sink` stream, all event put on it will be transformed using transducer `xform` before pass to `sink`. - `sink`: sink stream. - `xform`: a transducer transforms events, then put to `sink`
(xform-source src xform)
returns a source stream on src
transforming event from src
using transducer xform
.
will close source if this is drained.
src
: source stream.xform
: a transducer for transforming data from src
returns a source stream on `src` transforming event from `src` using transducer `xform`. will close source if this is drained. - `src`: source stream. - `xform`: a transducer for transforming data from `src`
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close