Onyx plugin providing consumer and producer facilities for kinesis.
Please note that this plugin is currently alpha quality, and does not support splitting or combining shards. Pull requests welcome!
The tests currently run slowly, as they have to create and delete shards on each run.
In your project file:
[org.onyxplatform/onyx-amazon-kinesis "0.14.1.1"]
In your peer boot-up namespace:
(:require [onyx.plugin.kinesis])
Reads segments from a kinesis topic. Peers will automatically be assigned to each
of the topics partitions, balancing the number of partitions over the number of
peers, unless :kinesis/partition
is supplied in which case only one partition
will be read from.
Data is emitted in the following form:
{:timestamp (.getApproximateArrivalTimestamp record)
:partition-key (.getPartitionKey record)
:sequence-number (.getSequenceNumber record)
:data (deserializer-fn (.array (.getData record)))}
Catalog entry:
{:onyx/name :read-messages
:onyx/plugin :onyx.plugin.kinesis/read-messages
:onyx/type :input
:onyx/medium :kinesis
:kinesis/stream-name "mystreamname"
:kinesis/shard-initialize-type :trim-horizon
:kinesis/deserializer-fn :my.ns/deserializer-fn
:kinesis/poll-interval-ms 250
:onyx/batch-timeout 50
:onyx/n-peers << NUMBER OF SHARDS TO READ PARTITIONS, UP TO N-SHARDS MAX >>
:onyx/batch-size 100
:onyx/doc "Reads messages from a kinesis topic"}
Lifecycle entry:
{:lifecycle/task :read-messages
:lifecycle/calls :onyx.plugin.kinesis/read-messages-calls}
key | type | default | description |
---|---|---|---|
:kinesis/stream-name | string | The stream to read from | |
:kinesis/shard | integer or string | Optional: shard to read or write to from if auto-assignment is not used | |
:kinesis/shard-initialize-type | keyword | Offset bound to seek to when not found - :latest or :trim-horizon | |
:kinesis/deserializer-fn | keyword | A keyword that represents a fully qualified namespaced function to deserialize a record's value. Takes one argument - a byte array | |
:kinesis/region | string | Optional: Kinesis AWS region | |
:kinesis/endpoint-url | string | Optional: The Kinesis endpoint-url to connect to. | |
:kinesis/access-key | string | Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper. | |
:kinesis/secret-key | string | Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper. | |
:kinesis/reader-backoff-ms | integer | Optional: Time to backoff a shard reader upon a ProvisionedThroughputExceededException | |
:kinesis/poll-interval-ms | integer | Optional: Minimum time in-between getRecords requests. Tune to match your provisioned shard throughput. |
Writes segments to a kinesis topic using the kinesis "new" producer.
Catalog entry:
{:onyx/name :write-messages
:onyx/plugin :onyx.plugin.kinesis/write-messages
:onyx/type :output
:onyx/medium :kinesis
:kinesis/stream-name "stream-name"
:kinesis/serializer-fn :my.ns/serializer-fn
:onyx/batch-size batch-size
:onyx/doc "Writes messages to a kinesis topic"}
Lifecycle entry:
{:lifecycle/task :write-messages
:lifecycle/calls :onyx.plugin.kinesis/write-messages-calls}
Segments supplied to a :onyx.plugin.kinesis/write-messages
task should be in in
the following form:
{:partition-key partition-key
:data data}
key | type | default | description |
---|---|---|---|
:kinesis/stream-name | string | The stream to read from | |
:kinesis/serializer-fn | keyword | A keyword that represents a fully qualified namespaced function to serialize a record's value. Takes one argument - the segment | |
:kinesis/region | string | Optional: kinesis AWS region | |
:kinesis/endpoint-url | string | Optional: The kinesis endpoint-url to connect to. | |
:kinesis/access-key | string | Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper. | |
:kinesis/secret-key | string | Optional: AWS access key to authorize when not using default provider chain. Avoid using kinesis/access-key if possible, as the key will be stored in ZooKeeper. |
Pull requests into the master branch are welcomed.
Copyright © 2017 Distributed Masonry
Distributed under the Eclipse Public License, the same as Clojure.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close