Liking cljdoc? Tell your friends :D

kinetic

aws kinesis client based on amazon 2.x api

<! release <! clojars>

... and some love

$ make repl

=> (require '[kinetic.consumer :as k])

consumer


=> (def consumer
     (k/start-consumer {:stream-name "lagoon-nebula"
                        :application-name "hubble"
                        :consume k/echo}))

echo :point_up_2: here is a sample function that decodes (UTF-8) and echos all the records it consumes from a kinesis stream

in order to do something more useful provide a function as a value of the ":consume" key
that takes a batch of kinesis records in this format.

initial position

by default a kinesis consumer will start consuming records from a previously recorded "checkpoint" that is stored in something that is called a "lease".

in order to start consuming from a custom place in a stream use a :start-from key:

=> (def consumer
     (k/start-consumer {:stream-name "lagoon-nebula"
                        :application-name "hubble"
                        :start-from {:position :trim-horizon}
                        :consume k/echo}))

you would see this new "initial position" in logs when the shard is initialized:

[ShardRecordProcessor-0000] INFO  kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}

possible values are (as per amazon's api):

  • {:position :trim-horizon}: start at the earliest sequence number available for your application
  • {:position :latest}: start at the latest sequence number available on the stream
  • {:position :at-timestamp :timestamp <date>}: start at a certain timestamp. the timestamp value is java.util.Date

for example:

=> (import '[java.util Date]
           '[java.time Instant]
           '[java.time.temporal ChronoUnit])

=> (def yesterday                             ;; to start consuming from
     (-> (Instant/now)
         (.minus 1 ChronoUnit/DAYS)
         (Date/from)))

=> (def consumer
     (k/start-consumer {:stream-name "lagoon-nebula"
                        :application-name "hubble"
                        :start-from {:position :at-timestamp :timestamp yesterday}
                        :consume k/echo}))

you'll see a different starting sequence number in the logs:

[ShardRecordProcessor-0000] INFO  kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: AT_TIMESTAMP,SubsequenceNumber: 0}

and the records will start arriving from yesterday until the latest one on the stream.

leases

=> (k/show-leases consumer)

;; [#object[software.amazon.kinesis.leases.Lease 0x764d54a0
;; "Lease(leaseKey=shardId-000000000000,
;;        leaseOwner=d498fa4c-12b5-45b3-a82f-9025e396f952,
;;        leaseCounter=124,
;;        concurrencyToken=null,
;;        lastCounterIncrementNanos=null,
;;        checkpoint={SequenceNumber: 50638743107472231712790482650536060104711379819100635138,
;;                    SubsequenceNumber: 0},
;;        pendingCheckpoint=null,
;;        pendingCheckpointState=null,
;;        isMarkedForLeaseSteal=false,
;;        ownerSwitchesSinceCheckpoint=0,
;;        parentShardIds=[],
;;        childShardIds=[],
;;        hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0,
;;                                                  endingHashKey=340282366920938463463374607431768211455))"]]

stopping consumer

=> (k/stop-consumer consumer))

credentials

AWS credentials will be either picked up via the regular AWS means
or can be explicitly provided to kinetic consumer:

(start-consumer {:stream-name "lagoon-nebula"
                 :application-name "hubble"
                 :start-from {:position :trim-horizon}
                 :creds {:access-key-id     "AK..ZZ"          ;; <= via a "creds" map
                         :secret-access-key "z0.........0m"}
                 :consume echo})

license

Copyright © 2023 tolitius

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

Can you improve this documentation? These fine people already did:
Anatoly & anatoly
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close