aws kinesis client based on amazon 2.x api
... and some love
$ make repl
=> (require '[kinetic.consumer :as k])
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"}]
:application-name "hubble"
:consume k/echo}))
echo :point_up_2: here is a sample function that decodes (UTF-8) and echos all the records it consumes from a kinesis stream
in order to do something more useful provide a function as a value of the ":consume
" key
that takes a batch of kinesis records in this format.
by default a kinesis consumer will start consuming records from a previously recorded "checkpoint
" that is stored in something that is called a "lease
".
in order to start consuming from a custom place in a stream use a :start-from
key:
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"
:start-from {:position :trim-horizon}}]
:application-name "hubble"
:consume k/echo}))
you would see this new "initial position" in logs when the shard is initialized:
[ShardRecordProcessor-0000] INFO kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}
possible values are (as per amazon's api):
{:position :trim-horizon}
: start at the earliest sequence number available for your application{:position :latest}
: start at the latest sequence number available on the stream{:position :at-timestamp :timestamp <date>}
: start at a certain timestamp. the timestamp value is java.util.Date
for example:
=> (import '[java.util Date]
'[java.time Instant]
'[java.time.temporal ChronoUnit])
=> (def yesterday ;; to start consuming from
(-> (Instant/now)
(.minus 1 ChronoUnit/DAYS)
(Date/from)))
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"
:start-from {:position :at-timestamp
:timestamp yesterday}}]
:application-name "hubble"
:consume k/echo}))
you'll see a different starting sequence number in the logs:
[ShardRecordProcessor-0000] INFO kinetic.consumer - initializing shard shardId-000000000000 at sequence {SequenceNumber: AT_TIMESTAMP,SubsequenceNumber: 0}
and the records will start arriving from yesterday until the latest one on the stream.
=> (k/show-leases consumer)
;; [#object[software.amazon.kinesis.leases.Lease 0x764d54a0
;; "Lease(leaseKey=shardId-000000000000,
;; leaseOwner=d498fa4c-12b5-45b3-a82f-9025e396f952,
;; leaseCounter=124,
;; concurrencyToken=null,
;; lastCounterIncrementNanos=null,
;; checkpoint={SequenceNumber: 50638743107472231712790482650536060104711379819100635138,
;; SubsequenceNumber: 0},
;; pendingCheckpoint=null,
;; pendingCheckpointState=null,
;; isMarkedForLeaseSteal=false,
;; ownerSwitchesSinceCheckpoint=0,
;; parentShardIds=[],
;; childShardIds=[],
;; hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0,
;; endingHashKey=340282366920938463463374607431768211455))"]]
=> (k/stop-consumer consumer))
for a single consumer to consume from multiple streams:
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"
:start-from {:position :trim-horizon}}
{:name "milky-way:solar:mars"}]
:application-name "hubble"
:consume k/echo}))
for multiple consumers to consume from one or more streams with the same application name:
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"
:start-from {:position :trim-horizon}}]
:application-name "hubble"
:multi-stream? true
:consume k/echo}))
"multi-stream?
" is not needed in the first example since there are more than one stream in the config
in the second example it means that even though it is a single stream, track it as "multi".
what both really mean is a different way leases are tracked.
in a case with a single stream (the default) leases are tracked with no stream intel in the lease key:
[#object[software.amazon.kinesis.leases.Lease 0x53630682 "Lease(leaseKey=shardId-000000000000",
checkpoint={SequenceNumber: 4964...3442, SubsequenceNumber: 0}]]
in the case of multi stream, when either:
multi-stream?
" is set in the configleases include stream intel in trackers, and use a different underlying object (MultiStreamLease) to store and parse / read leases:
[#object[software.amazon.kinesis.leases.MultiStreamLease 0x1df83569 "Lease(leaseKey=123243:milky-way.solar.pluto:1:shardId-000000000000",
checkpoint={SequenceNumber: 4923...3941, SubsequenceNumber: 0}]
#object[software.amazon.kinesis.leases.MultiStreamLease 0x253c7189 "Lease(leaseKey=123243:milky-way.solar.mars:1:shardId-000000000000",
checkpoint={SequenceNumber: 4923...3941, SubsequenceNumber: 0}]]
these 👆 are entries in a single lease dynambodb table that is named after the ":application-name
" config param.
one thing to keep in mind: 👉 multi stream entries cannot coexist with a single stream (no stream intel) entries because internally AWS client uses one type of a lease object to parse them.
AWS credentials will be either picked up via the regular AWS means
or can be explicitly provided to kinetic consumer:
=> (def consumer
(k/start-consumer {:streams [{:name "milky-way.solar.pluto"
:start-from {:position :trim-horizon}}]
:application-name "hubble"
:creds {:access-key-id "AK..ZZ" ;; <= via a "creds" map
:secret-access-key "z0.........0m"}
:consume k/echo}))
Copyright © 2023 tolitius
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation? These fine people already did:
anatoly & AnatolyEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close