(read-kafka bootstrap-servers topic key-deserializer value-deserializer p)
(read-kafka bootstrap-servers
topic
key-deserializer
value-deserializer
options
p)
Reads from a Kafka topic. Returns a KafkaRecord
(https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map
{:payload "Deserialized with `value-deserializer`"
:key "Deserialized with `key-deserializer`"
:offset "..."
:partition "..."
:timestamp "..."
:topic "..."
:headers "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"}
Examples:
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer value-deserializer options pcoll)
Using StringDeserializer
from org.apache.kafka.common.serialization
(https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html):
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll)
Available options:
KafkaTimestampType.CREATE_TIME
timestamp of the records.TimestampPolicyFactory.LogAppendTimePolicy
.Read.Unbounded.withMaxNumRecords(long)
.Read.Unbounded.withMaxReadTime(Duration)
.TimestampPolicyFactory.ProcessingTimePolicy
.Reads from a Kafka topic. Returns a `KafkaRecord` (https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map ``` {:payload "Deserialized with `value-deserializer`" :key "Deserialized with `key-deserializer`" :offset "..." :partition "..." :timestamp "..." :topic "..." :headers "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"} ``` Examples: ``` (kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer value-deserializer options pcoll) ``` Using `StringDeserializer` from `org.apache.kafka.common.serialization` (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html): ``` (kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll) ``` Available options: - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug. - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder. - :commit-offsets-in-finalize => Finalized offsets are committed to Kafka. - :name => Adds a name to the Transform. - :with-consumer-config-updates => Update configuration for the backend main consumer. - :with-create-time => Sets the timestamps policy based on `KafkaTimestampType.CREATE_TIME` timestamp of the records. - :with-log-append-time => Sets TimestampPolicy to `TimestampPolicyFactory.LogAppendTimePolicy`. - :with-max-num-records => Similar to `Read.Unbounded.withMaxNumRecords(long)`. - :with-max-read-time => Similar to `Read.Unbounded.withMaxReadTime(Duration)`. - :with-offset-consumer-config-overrides => - :with-processing-time => Sets TimestampPolicy to `TimestampPolicyFactory.ProcessingTimePolicy`. - :with-read-committed => Sets " isolation_level " to " read_committed " in Kafka consumer configuration. - :with-start-read-time => Provide custom `TimestampPolicyFactory to set event times and watermark for each partition. - :with-topic-partitions => Sets a list of partitions to read from. The list of partitions should be a collection of ['str-topic-name', int-partition-number] - :without-metadata => Returns a PTransform for PCollection of KV, dropping Kafka metatdata.
(write-kafka bootstrap-servers topic key-serializer value-serializer p)
(write-kafka bootstrap-servers topic key-serializer value-serializer options p)
Write to Kafka.
Examples:
(kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer value-serializer options pcoll)
Available options:
KafkaIO.WriteRecords.withEOS(int, String)
, used to keep the compatibility with old API based on KV type of element.KafkaIO.WriteRecords.withInputTimestamp()
, used to keep the compatibility with old API based on KV type of element.Write to Kafka. Examples: ``` (kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer value-serializer options pcoll) ``` Available options: - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug. - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder. - :name => Adds a name to the Transform. - :values => Writes just the values to Kafka. - :with-eos => Wrapper method over `KafkaIO.WriteRecords.withEOS(int, String)`, used to keep the compatibility with old API based on KV type of element. - :with-input-timestamp => Wrapper method over `KafkaIO.WriteRecords.withInputTimestamp()`, used to keep the compatibility with old API based on KV type of element. - :with-producer-config-updates => Update configuration for the producer.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close