(ns your-sum-ns)
(defn sum-init-fn [window]
0)
;; Given the example input in the next section, the below segment shape (over a kafka transport) would look something like this.
;; {:serialized-key-size 36,
;; :key "70144dea-cdd1-443d-9e7f-55cc5d0928d7",
;; :offset 0,
;; :serialized-value-size 22,
;; :partition 0,
;; :timestamp 1514680072539,
;; :message {:age 49, :name "John"}}
(defn sum-aggregation-fn [window segment]
(let [k (-> segment :message :age)]
{:value k}))
;; Now just pull out the value and add it to the previous state
(defn sum-application-fn [window state value]
(+ state (:value value)))
;; sum aggregation referenced in the window definition.
(def sum
{:aggregation/init sum-init-fn
:aggregation/create-state-update sum-aggregation-fn
:aggregation/apply-state-update sum-application-fn})