There are generally three strategies that exsist for graph processor message delivery:
Currently Cues only implements exactly once semantics for graph processors. This is significantly harder than the other two, but more useful overall. The main advantage of the other two is performance, but not by a large margin.
Nevertheless, at most once and at least once are next in the feature backlog and will be included in a future release.
Input messages for a processor step are considered delivered if either an output message is successfully written to an output queue, or an exception is caught and handled. A handled exception means that it is:
:error-queue
if the queue has been configuredAn unhandled error will result in a processor interrupt, and undelivered messages. Once the processor restarts, exactly once semantics will ensure that the messages are not lost and the delivery will be retried. It also ensures that once the messages are delivered, they will not be delivered again.
The exactly once algorithm is different depending on whether the
processor function has side effects or not. Here, "processor function"
refers to the user code defined by cues.queue/processor
. Obviously
the Cues implementation will have side effects in order to persist
messages to queues.
The algorithm described below is specifically for a "join" processor, which accepts multiple input messages and produces a single output message. However every other kind of processor can generalize from this one case.
A transient "backing queue" is needed to implement the algorithm. This queue is automatically created and isolated from the user. A user specified error queue can also be configured for delivery of handled error messages.
The algorithm can be broken down into two components.
Each of the components can safely fail at any point. The recovery component is idempotent. The persistence component is not: if it fails, the processor must exit and recovery must be run.
Collect the current index for all input tailers as well as the processor id into a snapshot map:
{:q/type :q.type/snapshot
:q/proc-id :cues.build/example-processor
:q/tailer-indices {::t1 080928480
::t2 080928481}}
If and only if this is the first attempt to deliver these messages, then write this snapshot map to a backing queue.
Compute a hash of the snapshot map. This is the delivery hash. The value is unique to the combination of indices and processor id, and will evaluate to be the same for each delivery attempt. Because the messages on the input queues are immutable, their indices and the processor id is all you need to compute the hash. You do not need the actual input messages.
Now read one or more messages from the input tailers, advancing the relevant tailers. If this is an alts processor and we are retrying a delivery attempt, make sure to read from the same tailer that was used for the initial attempt regardless of which tailers may now have messages (4a of the persistence component and 1b of the recovery component helps you do this).
a) Immediately after the read, and if this is an alts processor, and if this is the first attempt at delivery, create and alts snapshot map that contains the id of the alts tailer that was just read from. Because the availability of messages on an alts read is non-deterministic, we need to store which tailer was initially read from for subsequent alts attempts.
{:q/type :q.type/snapshot-alts
:q/tailer-id :some-tailer-id}
Write this map to the backing queue.
Run the processor function, which may or may not return an output message to be written to the output queue.
a) If there is an output message, proceed to step 7.
b) If there is no output message, proceed to step 8.
If at any point from steps 3 onward an exception is caught and handled:
a) Proceed to step 7 if the :error-queue
has been configured to
write an error message.
b) Proceed to step 8 if no :error-queue
has been configured,
and no error message should be written.
Open a transaction to write the output message to the output queue or an error message to the error queue:
a) Add the delivery hash to the metadata of the output or error message:
{:q/type :q.type/some-output-message-type
:q/meta {:q/hash -499796012 ....}
...}
b) Create an attempt map. The type of the attempt map should depend on whether the message is an output or error message. For example:
:q.type/attempt-output
:q.type/attempt-error
c) Get the index of the provisional write on the output or error queue, and add this message index to the attempt map:
{:q/type :q.type/attempt-output
:q/message-index 080928480}
d) Write the attempt map to the backing queue.
e) Complete the output or error transaction. The messages are considered delivered.
If there is no output or error message to be written (skip this if you have already done 7):
a) Create an attempt map of type :q.type/attempt-nil
.
b) Add the delivery hash to the attempt map:
{:q/type :q.type/attempt-nil
:q/hash -499796012}
c) Write the attempt map to the backing queue. The messages are considered delivered.
Continue processing the next set of messages. If at any point during steps 1-8 an unhandled error occurs, the processor should immediately interrupt and exit.
Read the most recent message on the backing queue. Depending on the message type:
a) No message on the backing queue: the processor has never made an attempt, proceed to 4.
b) :q.type/snapshot
: The previous attempt failed. Configure the
processor to skip making new snapshots until a delivery attempt
is successful. Reset the tailers to the indices in the snapshot
and proceed to 4.
c) :q.type/snapshot-alts
: The previous attempt failed. Configure
the alts processor with the tailer id in the message: all
subsequent alts retries should read from this tailer. Continue
reading backwards on the backing queue until you reach the first
snapshot. Proceed to 1b.
d) :q.type/attempt-output
: take the :q/message-index
from the
attempt map and read back the message at this index on the
output queue. Get the delivery hash from the output message
metadata. Proceed to 2.
e) :q.type/attempt-error
: take the :q/message-index
from the
attempt map and read back the message at this index on the
error queue. Get the delivery hash from the error message
metadata. Proceed to 2.
f) :q.type/attempt-nil
: get the delivery hash directly from the
attempt map. Proceed to 2.
Read backwards from the most recent message on the backing queue until you reach the first snapshot. Compute the hash of the snapshot.
Then:
a) If the delivery hash from step 1 and the snapshot hash from step 2 are the same: then the last delivery attempt was successful. Proceed to 4.
b) Else, they are not the same: the last delivery attempt failed. Reset tailers to the recovery indices in the snapshot message. Then proceed to 4.
Continue processing messages.
The exactly once algorithm described above relies on transactionally collocating the delivery hash with the output message on the output queue (or the error message on the error queue).
This is easy enough when there are no side effects and the data model of the messages and the implementation of the queues are both known. However, when processors perform side-effects on arbitrary external resources, the algorithm fails to generalize.
For these processors, Cues takes an approach similar to Kafka: it provides at least once messaging semantics on the processor, but then exposes the delivery hash to user code so that it can use the hash to implement idempotency or exactly once delivery semantics for the side effects:
(defmethod q/processor ::sink
[{:keys [delivery-hash] :as process} msgs]
;; Use delivery-hash to enable idempotence or enforce
;; exactly once semantics
...)
For example, consider a sink processor that takes messages from some input queues, combines them, and writes a result to a database. After an unhandled error, interrupt, and restart, the processor will attempt to re-deliver these messages at least once. However on each attempt, the delivery hash will be the same.
Therefore when writing the result to the database, the user code can collocate the hash with the result in the db to ensure the result is only written once. This usually requires some form of transactional semantics on the target db, or alternatively, some kind of single-writer configuration.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close