Liking cljdoc? Tell your friends :D

Introduction to fr33m0nk/clj-alpakka-kafka

Using Alpakka Kafka stream with a sink

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. First we will import required namespaces:
(require 
 '[fr33m0nk.akka.actor :as actor]
 '[fr33m0nk.akka.stream :as s]
 '[fr33m0nk.alpakka-kafka.committer :as committer]
 '[fr33m0nk.alpakka-kafka.consumer :as consumer]
 '[fr33m0nk.utils :as utils])
  1. Now we well create a stream topology
  • This topology consumes messages

  • s/map-async executes mapping function with 2 messages being processed in parallel

    • Processing of s/map-async may not be in order, however output of s/map-async is always in order
    • If processing order is needed, use s/map
  • Then we commit offsets to Kafka via s/to-mat and committer/sink

  • Finally, we run the stream with our actor-system using s/run

(defn test-stream
  [actor-system consumer-settings committer-settings consumer-topics]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [key (consumer/key message)
                           value (consumer/value message)]
                       ;; Do business processing    
                       (println "Key is " key)
                       (println "Value is " value))
                       ;; Return message
                     message)
                     ;; then-fn returns committable offset for offset committing
                   (fn [message]
                     (consumer/committable-offset message)))
      (s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Lets now run the stream and see it in action
;; Create actor system
(def actor-system (actor/->actor-system "test-actor-system"))

;; Create committer settings
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

;; Create consumer-settings
(def consumer-settings (consumer/->consumer-settings actor-system
                                                       {:group-id "a-test-consumer"
                                                        :bootstrap-servers "localhost:9092"
                                                        :key-deserializer (StringDeserializer.)
                                                        :value-deserializer (StringDeserializer.)}))

;; holding on to consumer-control to shutdown streams
(def consumer-control (test-stream actor-system consumer-settings committer-settings ["testing_stuff"]))
  1. Stream in action 😃
image 5. Let's shutdown the stream now
;; shutdown streams using consumer-control var
(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
(actor/terminate actor-system)

Using Alpakka Kafka stream with a Kafka Producer

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. Let's require following namespace
(require '[fr33m0nk.alpakka-kafka.producer :as producer])
  1. We well create a new stream topology
  • This topology consumes messages
  • s/map-async executes mapping function with 2 messages being processed in parallel
  • Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink
  • Finally, we run the stream with our actor-system using s/run
(defn test-stream-with-producer
  [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [_key (consumer/key message)      ;; Don't care as it is null
                           value (consumer/value message)
                           committable-offset (consumer/committable-offset message)
                           message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
                       (producer/single-producer-message-envelope committable-offset message-to-publish))))
      (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
  1. Let's run the stream and see it in action
(def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
  1. Streams in action 😃
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
(actor/terminate actor-system)

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close