This is a paper outlining some of the options and different discussions that can be made with regards to building a follower / indexer application with Crux that is supposed to complement existing systems with queries in a event streaming architecture.
Crux follower would be a application meant to index and provide bi temporal query over existing message busses in a system. Crux is in that usage mode something like a materialized view over the existing source of truth.
What it exactly means to be a good tool for something like this is unclear as this is a new area. But I will list some possibilities for features that potentially can make a tool like this useful.
This is likely something that needs to be pluggable by the client using Crux. As crux will need not just to parse the content in the topic (and key possibly) but it needs to derive some metadata from the message as well.
like: * It needs to derive the business time of the event
It needs to derive evictions for data not supposed to be queryable anymore.
Deriving the primary key for messages Some messages are just independent events that happened and should be queryable as such. But some messages represent a update to a entity of sorts and they should mutually exclusively be available for query as of a single point in time. Which can be accomplished with Crux current entity model by the messages having the same :crux.db/id and different business times. So for this the :crux.db/id needs to be derived from the message or key.
It needs to infer corrections of business time. Since business time is inferred for the message writing to the past is always possible. But when you have a correction its possible that you need to infer both a retraction and a insertion from a message.
If the upstream message buss is modified or and possibly more likely the pluggable code needs to change to reflect some change to how messages should be interpreted. Then Crux would need to be able to reprocess and update the state.
This could be done in multiple ways. * A: We could purge the existing database and start over. And be down while that happens.
B: Write over the existing state with the new state while providing query over a partially migrated state.
C: Do some sort of query version handling were we continue to index with the old version and serve queries with that until the new version that is processed in parallel catches up. Once the new version has catched up forward queries to the new version and delete the old version.
For a application like this to be useful then a option were change can be done without downtime should probably be considered.
A alternative to having crux follow and index topics itself with the config provided by the user. We can let the user of Crux to post data to it with there own consumers of there topics and be responsible to transform that to Crux transactions.
This is more like a traditional database with updates and queries but being used primarily as a materialized view would still make it useful with features like. Having separate databases (or indexes in Elastic Search speak) for letting the user accomplish the objectives of "Reprocessing of the existing state". And with that the ability to switch queries over to the newly created index.
If all you do is derive a business time from every message and let the user query as of that business time then you get a eventually consistent system (assuming you are indexing a system that is concurrent).
It presents a eventually consistent global order of events. But the global order will not be consistent with the order of events that a individual agent in the system observed.
A example of when that would be conflicting would be something like A user was banned at business time 10:00 and a payment for a post for a user was accepted at 10:01. If we query the database as of 10:01 it will seem that the user was banned when the post was maid. But since this is a concurrent system the process/instance responsible for accepting the post may not have received the banning of the user at that point in time. For that process/instance the banning effectively happened after or during the acceptance of the post.
To be able to order these events and make views like as of the discussion to accept the post queryable something richer than just a timestamp for business time needs to be stored.
Indexing kafka topics and partitions we can store at what topic and partition the entity was from. But to get the dependencies, meaning how far into other topics and partitions had the producer of the post accepted message read when accepting the post. To derive that and record it. Then it would probably need to be stored by the producers in the messages to start with. (unless there’s some funky way to derive it from Kafka’s internal transaction log. which might be the case since it does seem to store the depending topics+partitions of a transaction)
If these dependencies are stored then this will allow Crux also to block (or filter out) queries that don’t yet have the necessary data from all its indexing topics+partitions that are required to fulfil a complete view.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close