(require
'[fr33m0nk.akka.actor :as actor]
'[fr33m0nk.akka.stream :as s]
'[fr33m0nk.alpakka-kafka.committer :as committer]
'[fr33m0nk.alpakka-kafka.consumer :as consumer]
'[fr33m0nk.utils :as utils])
This topology consumes messages
s/map-async
executes mapping function with 2 messages being processed in parallel
s/map-async
may not be in order, however output of s/map-async
is always in orders/map
Then we commit offsets to Kafka via s/to-mat
and committer/sink
Finally, we run the stream with our actor-system using s/run
(defn test-stream
[actor-system consumer-settings committer-settings consumer-topics]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [key (consumer/key message)
value (consumer/value message)]
;; Do business processing
(println "Key is " key)
(println "Value is " value))
;; Return message
message)
;; then-fn returns committable offset for offset committing
(fn [message]
(consumer/committable-offset message)))
(s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
(s/run actor-system)))
;; Create actor system
(def actor-system (actor/->actor-system "test-actor-system"))
;; Create committer settings
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
;; Create consumer-settings
(def consumer-settings (consumer/->consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
;; holding on to consumer-control to shutdown streams
(def consumer-control (test-stream actor-system consumer-settings committer-settings ["testing_stuff"]))
;; shutdown streams using consumer-control var
(consumer/drain-and-shutdown consumer-control
(CompletableFuture/supplyAsync
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
(actor/terminate actor-system)
(require '[fr33m0nk.alpakka-kafka.producer :as producer])
s/map-async
executes mapping function with 2 messages being processed in parallels/to-mat
and producer/committable-sink
s/run
(defn test-stream-with-producer
[actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
committable-offset (consumer/committable-offset message)
message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
(producer/single-producer-message-envelope committable-offset message-to-publish))))
(s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
(s/run actor-system)))
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
(def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
;; shutdown streams using consumer-control var
(consumer/drain-and-shutdown consumer-control
(CompletableFuture/supplyAsync
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
(actor/terminate actor-system)
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close