(:require [onyx.plugin.seq])
Onyx plugin for reading from a seq. The seq will be read from in a way that is approriate for use with lazy-seqs. Therefore, this plugin can be useful for use with datomic.api/datoms calls, slow lazy calculations, line-seq / buffered reading, etc.
This plugin is included with Onyx. You do not need to add it as a separate dependency.
In your peer boot-up namespace:
(:require [onyx.plugin.seq])
Catalog entry:
{:onyx/name :in
:onyx/plugin :onyx.plugin.seq/input
:onyx/type :input
:onyx/medium :seq
:seq/checkpoint? true
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Reads segments from seq"}
Lifecycle entry:
[{:lifecycle/task :in
:lifecycle/calls :onyx.plugin.seq/reader-calls}]
Ensure that the elements of your seq are maps. If they are not, it may help to bundle them into maps by transforming the elements before returning it from the lifecycle.
e.g. (map (fn [a] {:val a}) ["A" "B" "C"])
The plugin will checkpoint the state of the read results. In the case of a
virtual peer crash, the new virtual peer will drop from the seq until it has
reached the first non fully acked segment. In order for this process to work,
the seq that is read from be reproducible on restart. If it is not, please
disable checkpointing via :seq/checkpoint?
.
(defn inject-in-reader [event lifecycle]
(let [rdr (FileReader. (:buffered-reader/filename lifecycle))]
{:seq/rdr rdr
:seq/seq (line-seq (BufferedReader. rdr))}))
(defn close-reader [event lifecycle]
(.close (:seq/rdr event)))
(def in-calls
{:lifecycle/before-task-start inject-in-reader
:lifecycle/after-task-stop close-reader})
;; lifecycles
(def lifecycles
[{:lifecycle/task :in
:buffered-reader/filename "test-resources/lines.txt"
:lifecycle/calls ::in-calls}
{:lifecycle/task :in
:lifecycle/calls :onyx.plugin.seq/reader-calls}])
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close