Liking cljdoc? Tell your friends :D

onyx-kinesis

Onyx plugin providing consumer and producer facilities for kinesis.

Please note that this plugin is currently alpha quality, and does not support splitting or combining shards. Pull requests welcome!

Development

The tests currently run slowly, as they have to create and delete shards on each run.

Installation

In your project file:

[org.onyxplatform/onyx-amazon-kinesis "0.14.1.1"]

In your peer boot-up namespace:

(:require [onyx.plugin.kinesis])

Functions

read-messages

Reads segments from a kinesis topic. Peers will automatically be assigned to each of the topics partitions, balancing the number of partitions over the number of peers, unless :kinesis/partition is supplied in which case only one partition will be read from.

Data is emitted in the following form:

{:timestamp (.getApproximateArrivalTimestamp record)
 :partition-key (.getPartitionKey record)
 :sequence-number (.getSequenceNumber record)
 :data (deserializer-fn (.array (.getData record)))}

Catalog entry:

{:onyx/name :read-messages
 :onyx/plugin :onyx.plugin.kinesis/read-messages
 :onyx/type :input
 :onyx/medium :kinesis
 :kinesis/stream-name "mystreamname"
 :kinesis/shard-initialize-type :trim-horizon
 :kinesis/deserializer-fn :my.ns/deserializer-fn
 :kinesis/poll-interval-ms 250
 :onyx/batch-timeout 50
 :onyx/n-peers << NUMBER OF SHARDS TO READ PARTITIONS, UP TO N-SHARDS MAX >>
 :onyx/batch-size 100
 :onyx/doc "Reads messages from a kinesis topic"}

Lifecycle entry:

{:lifecycle/task :read-messages
 :lifecycle/calls :onyx.plugin.kinesis/read-messages-calls}
Attributes
keytypedefaultdescription
:kinesis/stream-namestring The stream to read from
:kinesis/shardinteger or string Optional: shard to read or write to from if auto-assignment is not used
:kinesis/shard-initialize-typekeyword Offset bound to seek to when not found - :latest or :trim-horizon
:kinesis/deserializer-fnkeyword A keyword that represents a fully qualified namespaced function to deserialize a record's value. Takes one argument - a byte array
:kinesis/regionstring Optional: Kinesis AWS region
:kinesis/endpoint-urlstring Optional: The Kinesis endpoint-url to connect to.
:kinesis/access-keystring Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper.
:kinesis/secret-keystring Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper.
:kinesis/reader-backoff-msinteger Optional: Time to backoff a shard reader upon a ProvisionedThroughputExceededException
:kinesis/poll-interval-msinteger Optional: Minimum time in-between getRecords requests. Tune to match your provisioned shard throughput.
write-messages

Writes segments to a kinesis topic using the kinesis "new" producer.

Catalog entry:

{:onyx/name :write-messages
 :onyx/plugin :onyx.plugin.kinesis/write-messages
 :onyx/type :output
 :onyx/medium :kinesis
 :kinesis/stream-name "stream-name"
 :kinesis/serializer-fn :my.ns/serializer-fn
 :onyx/batch-size batch-size
 :onyx/doc "Writes messages to a kinesis topic"}

Lifecycle entry:

{:lifecycle/task :write-messages
 :lifecycle/calls :onyx.plugin.kinesis/write-messages-calls}

Segments supplied to a :onyx.plugin.kinesis/write-messages task should be in in the following form:

{:partition-key partition-key
 :data data}
Attributes
keytypedefaultdescription
:kinesis/stream-namestring The stream to read from
:kinesis/serializer-fnkeyword A keyword that represents a fully qualified namespaced function to serialize a record's value. Takes one argument - the segment
:kinesis/regionstring Optional: kinesis AWS region
:kinesis/endpoint-urlstring Optional: The kinesis endpoint-url to connect to.
:kinesis/access-keystring Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper.
:kinesis/secret-keystring Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper.

Contributing

Pull requests into the master branch are welcomed.

License

Copyright © 2017 Distributed Masonry

Distributed under the Eclipse Public License, the same as Clojure.

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close