Liking cljdoc? Tell your friends :D

kinetic

aws kinesis client based on amazon 2.x api

<! release <! clojars>

... and some love

$ make repl

=> (require '[kinetic.consumer :as k])

consumer


=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"}]
                        :application-name "hubble"
                        :consume k/echo}))

echo :point_up_2: here is a sample function that decodes (UTF-8) and echos all the records it consumes from a kinesis stream

in order to do something more useful provide a function as a value of the ":consume" key
that takes a batch of kinesis records in this format.

initial position

by default a kinesis consumer will start consuming records from a previously recorded "checkpoint" that is stored in something that is called a "lease".

in order to start consuming from a custom place in a stream use a :start-from key:

=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"
                                   :start-from {:position :trim-horizon}}]
                        :application-name "hubble"
                        :consume k/echo}))

you would see this new "initial position" in logs when the shard is initialized:

[ShardRecordProcessor-0000] INFO  kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}

possible values are (as per amazon's api):

  • {:position :trim-horizon}: start at the earliest sequence number available for your application
  • {:position :latest}: start at the latest sequence number available on the stream
  • {:position :at-timestamp :timestamp <date>}: start at a certain timestamp. the timestamp value is java.util.Date

for example:

=> (import '[java.util Date]
           '[java.time Instant]
           '[java.time.temporal ChronoUnit])

=> (def yesterday                             ;; to start consuming from
     (-> (Instant/now)
         (.minus 1 ChronoUnit/DAYS)
         (Date/from)))

=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"
                                   :start-from {:position :at-timestamp
                                                          :timestamp yesterday}}]
                        :application-name "hubble"
                        :consume k/echo}))

you'll see a different starting sequence number in the logs:

[ShardRecordProcessor-0000] INFO  kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: AT_TIMESTAMP,SubsequenceNumber: 0}

and the records will start arriving from yesterday until the latest one on the stream.

leases

=> (k/show-leases consumer)

;; [#object[software.amazon.kinesis.leases.Lease 0x764d54a0
;; "Lease(leaseKey=shardId-000000000000,
;;        leaseOwner=d498fa4c-12b5-45b3-a82f-9025e396f952,
;;        leaseCounter=124,
;;        concurrencyToken=null,
;;        lastCounterIncrementNanos=null,
;;        checkpoint={SequenceNumber: 50638743107472231712790482650536060104711379819100635138,
;;                    SubsequenceNumber: 0},
;;        pendingCheckpoint=null,
;;        pendingCheckpointState=null,
;;        isMarkedForLeaseSteal=false,
;;        ownerSwitchesSinceCheckpoint=0,
;;        parentShardIds=[],
;;        childShardIds=[],
;;        hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0,
;;                                                  endingHashKey=340282366920938463463374607431768211455))"]]

stopping consumer

=> (k/stop-consumer consumer))

multiple streams

for a single consumer to consume from multiple streams:

=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"
                                   :start-from {:position :trim-horizon}}
                                  {:name "milky-way:solar:mars"}]
                        :application-name "hubble"
                        :consume k/echo}))

for multiple consumers to consume from one or more streams with the same application name:

=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"
                                   :start-from {:position :trim-horizon}}]
                        :application-name "hubble"
                        :multi-stream? true
                        :consume k/echo}))

"multi-stream?" is not needed in the first example since there are more than one stream in the config
in the second example it means that even though it is a single stream, track it as "multi".

what both really mean is a different way leases are tracked.

in a case with a single stream (the default) leases are tracked with no stream intel in the lease key:

[#object[software.amazon.kinesis.leases.Lease 0x53630682 "Lease(leaseKey=shardId-000000000000",
                                                          checkpoint={SequenceNumber: 4964...3442, SubsequenceNumber: 0}]]

in the case of multi stream, when either:

  • more than one stream is provided in the config
    OR
  • the explicit "multi-stream?" is set in the config

leases include stream intel in trackers, and use a different underlying object (MultiStreamLease) to store and parse / read leases:

[#object[software.amazon.kinesis.leases.MultiStreamLease 0x1df83569 "Lease(leaseKey=123243:milky-way.solar.pluto:1:shardId-000000000000",
                                                          checkpoint={SequenceNumber: 4923...3941, SubsequenceNumber: 0}]
 #object[software.amazon.kinesis.leases.MultiStreamLease 0x253c7189 "Lease(leaseKey=123243:milky-way.solar.mars:1:shardId-000000000000",
                                                          checkpoint={SequenceNumber: 4923...3941, SubsequenceNumber: 0}]]

these 👆 are entries in a single lease dynambodb table that is named after the ":application-name" config param.

one thing to keep in mind: 👉 multi stream entries cannot coexist with a single stream (no stream intel) entries because internally AWS client uses one type of a lease object to parse them.

credentials

AWS credentials will be either picked up via the regular AWS means
or can be explicitly provided to kinetic consumer:

=> (def consumer
     (k/start-consumer {:streams [{:name "milky-way.solar.pluto"
                                   :start-from {:position :trim-horizon}}]
                        :application-name "hubble"
                        :creds {:access-key-id     "AK..ZZ"          ;; <= via a "creds" map
                                :secret-access-key "z0.........0m"}
                        :consume k/echo}))

license

Copyright © 2023 tolitius

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

Can you improve this documentation? These fine people already did:
anatoly & Anatoly
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close