(close-handler {:keys [in-ch close-ch]})Listens to close-ch for a true message. When received, shuts down in-ch and close-ch
Listens to `close-ch` for a `true` message. When received, shuts down `in-ch` and `close-ch`
(control-loop {:keys [in-ch ctl-ch out-ch close-ch driver output-policy]})Starts the loop that process control commands from the user
Starts the loop that process control commands from the user
(create config)Create a producer, returning a map that contains 3 channels:
:in-ch - Input channel used to send messages to the producer.
:ctl-ch - Control channel used to manage the producer connection
:out-ch - Output channel used to receive messages from the Kafka producer and controller.
This channel will be nil unless a non-empty output-policy set is specified.
Configuration options:
:output-policy: A set containing the output sources that should be returned via the out-ch.
Valid options include:
`:control`: Output from control operations
`:error`: Any exceptions or errors that occur
`:data`: Response from kafka producer after sending a message
The Kafka Java interface returns each of this as a Java object. Gregor converts each
to pure data (i.e. a map).
:timeout: time to wait, in milliseconds, for queued messages to send when closing the producer
:key-serializer: Serializer to use for key serialization, default is :edn.
Valid serializers are :edn, :string, :json and :keyword
:value-serializer: Serializer to use for value serialization, default is :edn.
Valid serializers are :edn, :string and :json
:input-buffer: Buffer size of :in-ch and ctl-ch, default is 10
:output-buffer: Buffer size of :out-ch, default is 100
:kafka-configuration: Map containing kafka producer configuration settings. This map will be converted
into key/value properties and passed directly to the KafkaProducer object.
:transducer: Transformation function to be applied to all data received via in-ch channel, prior to being
posted to Kafka.
Example Configuration:
{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000} :output-policy #{:control :error} :key-serializer :string :value-serializer :json :input-buffer 20 :output-buffer 50 :timeout 100}
All key value pairs in :kafka-configuration will be converted and inserted into a property map and
passed into the Kafka Java client as configuration options.
Create a producer, returning a map that contains 3 channels:
`:in-ch` - Input channel used to send messages to the producer.
`:ctl-ch` - Control channel used to manage the producer connection
`:out-ch` - Output channel used to receive messages from the Kafka producer and controller.
This channel will be `nil` unless a non-empty `output-policy` set is specified.
Configuration options:
`:output-policy`: A set containing the output sources that should be returned via the `out-ch`.
Valid options include:
`:control`: Output from control operations
`:error`: Any exceptions or errors that occur
`:data`: Response from kafka producer after sending a message
The Kafka Java interface returns each of this as a Java object. Gregor converts each
to pure data (i.e. a map).
`:timeout`: time to wait, in milliseconds, for queued messages to send when closing the producer
`:key-serializer`: Serializer to use for key serialization, default is `:edn`.
Valid serializers are `:edn`, `:string`, `:json` and `:keyword`
`:value-serializer`: Serializer to use for value serialization, default is `:edn`.
Valid serializers are `:edn`, `:string` and `:json`
`:input-buffer`: Buffer size of `:in-ch` and `ctl-ch`, default is 10
`:output-buffer`: Buffer size of `:out-ch`, default is 100
`:kafka-configuration`: Map containing kafka producer configuration settings. This map will be converted
into key/value properties and passed directly to the KafkaProducer object.
`:transducer`: Transformation function to be applied to all data received via `in-ch` channel, prior to being
posted to Kafka.
Example Configuration:
`{:kafka-configuration {:bootstrap.servers "localhost:9092"
:max.poll.recordss 1000}
:output-policy #{:control :error}
:key-serializer :string
:value-serializer :json
:input-buffer 20
:output-buffer 50
:timeout 100}`
All key value pairs in `:kafka-configuration` will be converted and inserted into a property map and
passed into the Kafka Java client as configuration options.(create-context {:keys [input-buffer output-buffer output-policy timeout
transducer]
:or {input-buffer default-input-buffer
output-buffer default-output-buffer
timeout default-timeout
output-policy #{}}
:as config})Builds context map containing all the driver, various channels and configuration options
Builds context map containing all the driver, various channels and configuration options
(input-loop {:keys [in-ch out-ch close-ch output-policy driver timeout]
:as context})Starts the loop that processes input, waiting for the user to send input on in channel
Starts the loop that processes input, waiting for the user to send input on `in` channel
(process-input message driver output-policy)Sends message to the kafka producer, returning results via the out channel
Sends `message` to the kafka `producer`, returning results via the `out` channel
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |