Transformations for the stream of parsed binlog events.
Transformations for the stream of parsed binlog events.
(add-binlog-filename init-filename)
Build a stateful transducer to add binlog filename to all events. Normally the events data only contains binlog position. This transducer tracks the current filename, starting from init-filename, and updates the filename from rotate events. Transducer also removes the rotate events.
Build a stateful transducer to add binlog filename to all events. Normally the events data only contains binlog position. This transducer tracks the current filename, starting from init-filename, and updates the filename from rotate events. Transducer also removes the rotate events.
(add-table-schema out in {:keys [schema-cache] :as opts})
Processes event-pairs from in channel and adds a schema to the table-map event in case the first event is a table-map. Writes resulting enriched event-pairs to out channel. If the event-pair contains a :alter-table event then that event is filtered and schema cache is cleared.
Processes event-pairs from in channel and adds a schema to the table-map event in case the first event is a table-map. Writes resulting enriched event-pairs to out channel. If the event-pair contains a :alter-table event then that event is filtered and schema cache is cleared.
(convert-with-schema event-pair)
Given an event-pair of table-map and mutation (write/update/delete) returns a seq of tuples, one per row in mutation, of format [row-type table-key id mapped-row meta]. The elements are:
Given an event-pair of table-map and mutation (write/update/delete) returns a seq of tuples, one per row in mutation, of format [row-type table-key id mapped-row meta]. The elements are: * row-type - :upsert or :delete * table-key - The table name as keyword * id - Value of the row primary key * mapped-row - Row contents as a map for :upsert, nil for :delete * meta - binlog position and ts from the source binlog event
(filter-database expected-db)
Returns a transducer that removes events that are not from the given database
Returns a transducer that removes events that are not from the given database
(filter-tables expected-tables)
Returns a transducer that removes events that are not from any of the given tables. Does not filter events that do not contain table information (i.e. alter table event). If expected-tables is nil or empty degenerates to an allow all filter.
Returns a transducer that removes events that are not from any of the given tables. Does not filter events that do not contain table information (i.e. alter table event). If expected-tables is nil or empty degenerates to an allow all filter.
A stateful transducer to filter away canceled transactions. Internally batches all events in a transaction and releases them at once on successful commit. It removes the events marking tx boundaries.
A stateful transducer to filter away canceled transactions. Internally batches all events in a transaction and releases them at once on successful commit. It removes the events marking tx boundaries.
A stateful transducer to group table-map events with the following write/update/delete operations.
A stateful transducer to group table-map events with the following write/update/delete operations.
(next-position this)
Return the binlog position to use to continue streaming.
Return the binlog position to use to continue streaming.
(source this)
Return the stream output as a Manifold source.
Return the stream output as a Manifold source.
(start! this)
Start the given stream if not already started.
Start the given stream if not already started.
(stop! this)
Stop the given stream if started.
Stop the given stream if started.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close