Onyx plugin providing read and write facilities for Kafka. This plugin automatically discovers broker locations from ZooKeeper and updates the consumers when there is a broker failover.
This plugin version is only compatible with Kafka 0.10+. Please use onyx-kafka-0.8 with Kafka 0.8.
In your project file:
[org.onyxplatform/onyx-kafka "0.13.3.0-alpha2"]
In your peer boot-up namespace:
(:require [onyx.plugin.kafka])
Reads segments from a Kafka topic. Peers will automatically be assigned to each
of the topics partitions, balancing the number of partitions over the number of
peers, unless :kafka/partition is supplied in which case only one partition
will be read from.
Catalog entry:
{:onyx/name :read-messages
 :onyx/plugin :onyx.plugin.kafka/read-messages
 :onyx/type :input
 :onyx/medium :kafka
 :kafka/topic "my topic"
 :kafka/receive-buffer-bytes 65536
 :kafka/zookeeper "127.0.0.1:2181"
 :kafka/offset-reset :earliest
 :kafka/deserializer-fn :my.ns/deserializer-fn
 :kafka/wrap-with-metadata? false
 ;; :kafka/start-offsets {p1 offset1, p2, offset2}
 ;; :kafka/target-offsets {p1 offset3, p2, offset4}
 :onyx/batch-timeout 50
 :onyx/n-peers << NUMBER OF PEERS TO READ PARTITIONS, UP TO N-PARTITION MAX >>
 :onyx/batch-size 100
 :onyx/doc "Reads messages from a Kafka topic"}
Lifecycle entry:
{:lifecycle/task :read-messages
 :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
| key | type | default | description | 
|---|---|---|---|
| :kafka/topic | string | The topic name to connect to | |
| :kafka/partition | string | Optional: partition to read or write to from if auto-assignment is not used | |
| :kafka/zookeeper | string | The ZooKeeper connection string | |
| :kafka/offset-reset | keyword | Offset bound to seek to when not found - :earliestor:latest | |
| :kafka/receive-buffer-bytes | integer | 65536 | The size in the receive buffer in the Kafka consumer. | 
| :kafka/key-deserializer-fn | keyword | A keyword that represents a fully qualified namespaced function to deserialize a record's key. Takes one argument - a byte array. Only used when :kafka/wrap-with-metadata?is true. | |
| :kafka/deserializer-fn | keyword | A keyword that represents a fully qualified namespaced function to deserialize a record's value. Takes one argument - a byte array | |
| :kafka/wrap-with-metadata? | boolean | false | Wraps message into map with keys :key,:serialized-key-size,:serialized-value-size,:offset,:timestamp,:partition,:topicand:messageitself | 
| :kafka/start-offsets | map | Allows a task to be supplied with the starting offsets for all partitions. Maps partition to offset, e.g. {0 50, 1, 90}will start at offset 50 for partition 0, and offset 90 for partition 1 | |
| :kafka/target-offsets | map | Allows a task to be supplied with target offsets for all partitions. The consumer will read up to and including the target offset for each partition. | |
| :kafka/consumer-opts | map | A map of arbitrary configuration to merge into the underlying Kafka consumer base configuration. Map should contain strings as keys, and the valid values described in the Kafka Docs. | 
Writes segments to a Kafka topic using the Kafka "new" producer.
Catalog entry:
{:onyx/name :write-messages
 :onyx/plugin :onyx.plugin.kafka/write-messages
 :onyx/type :output
 :onyx/medium :kafka
 :kafka/topic "topic"
 :kafka/zookeeper "127.0.0.1:2181"
 :kafka/serializer-fn :my.ns/serializer-fn
 :kafka/request-size 307200
 :onyx/batch-size batch-size
 :onyx/doc "Writes messages to a Kafka topic"}
Lifecycle entry:
{:lifecycle/task :write-messages
 :lifecycle/calls :onyx.plugin.kafka/write-messages-calls}
Segments supplied to a :onyx.plugin.kafka/write-messages task should be in in
the following form: {:message message-body} with optional partition, topic and
key values.
{:message message-body
 :key optional-key
 :partition optional-partition
 :topic optional-topic}
| key | type | default | description | 
|---|---|---|---|
| :kafka/topic | string | The topic name to connect to | |
| :kafka/zookeeper | string | The ZooKeeper connection string | |
| :kafka/key-serializer-fn | keyword | A keyword that represents a fully qualified namespaced function to serialize a record's key. Takes one argument - the segment | |
| :kafka/serializer-fn | keyword | A keyword that represents a fully qualified namespaced function to serialize a record's value. Takes one argument - the segment | |
| :kafka/request-size | number | 307200 | The maximum size of request messages.  Maps to the max.request.sizevalue of the internal kafka producer. | 
| :kafka/no-seal? | boolean | false | Do not write :done to the topic when task receives the sentinel signal (end of batch job) | 
| :kafka/producer-opts | map | A map of arbitrary configuration to merge into the underlying Kafka producer base configuration. Map should contain strings as keys, and the valid values described in the Kafka Docs. | 
A take-segments utility function is provided for use when testing the results
of jobs with kafka output tasks. take-segments reads from a topic until a :done
is reached, and then returns the results. Note, if a :done is never written to a
topic, this will hang forever as there is no timeout.
(ns your-ns.a-test
  (:require [onyx.kafka.utils :as kpu]))
;; insert code to run a job here
;; retrieve the segments on the topic
(def results
  (kpu/take-segments (:zookeeper/address peer-config) "yourtopic" your-decompress-fn))
(last results)
; :done
To benchmark, start a real ZooKeeper instance (at 127.0.0.1:2181) and Kafka instance, and run the following benchmarks.
Write perf, single peer writer:
TIMBRE_LOG_LEVEL=:info lein test onyx.plugin.output-bench-test :benchmark
Read perf, single peer reader:
TIMBRE_LOG_LEVEL=:info lein test onyx.plugin.input-benchmark-test :benchmark
Past results are maintained in dev-resources/benchmarking/results.txt.
Pull requests into the master branch are welcomed.
Copyright © 2018 Distributed Masonry
Distributed under the Eclipse Public License, the same as Clojure.
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs | 
| ← | Move to previous article | 
| → | Move to next article | 
| Ctrl+/ | Jump to the search field |