Receives events from and forwards events to Kafka.
Receives events from and forwards events to Kafka.
(json-deserializer)
Deserialize JSON. Let bad payload not break the consumption.
Deserialize JSON. Let bad payload not break the consumption.
(kafka)
(kafka opts)
Returns a function that is invoked with a topic name and an optional message key and returns a stream. That stream is a function which takes an event or a sequence of events and sends them to Kafka.
(def kafka-output (kafka))
(changed :state
(kafka-output "mytopic"))
Options:
For a complete list of producer configuration options see https://kafka.apache.org/documentation/#producerconfigs
Example with SSL enabled:
(def kafka-output (kafka {:bootstrap.servers "kafka.example.com:9092"
:security.protocol "SSL"
:ssl.truststore.location "/path/to/my/truststore.jks"
:ssl.truststore.password "mypassword"}))
Returns a function that is invoked with a topic name and an optional message key and returns a stream. That stream is a function which takes an event or a sequence of events and sends them to Kafka. ```clojure (def kafka-output (kafka)) (changed :state (kafka-output "mytopic")) ``` Options: For a complete list of producer configuration options see https://kafka.apache.org/documentation/#producerconfigs - :bootstrap.servers Bootstrap configuration, default is "localhost:9092". - :value.serializer Value serializer, default is json-serializer. Example with SSL enabled: ```clojure (def kafka-output (kafka {:bootstrap.servers "kafka.example.com:9092" :security.protocol "SSL" :ssl.truststore.location "/path/to/my/truststore.jks" :ssl.truststore.password "mypassword"})) ```
(kafka-consumer opts)
Yield a kafka consumption service
Yield a kafka consumption service
(start-kafka-thread running? core opts)
Start a kafka thread which will pop messages off the queue as long as running? is true
Start a kafka thread which will pop messages off the queue as long as running? is true
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close