Friday, December 10, 2021

Idempotency as a Process

From event streaming to datastore

Before delving into the subject matter, the first question to ask is what is idempotency? A quick Google search provides us with the mathematical definition which is also relevant in terms of the software concept.

The definition itself may be straightforward, however when a software solution involves consuming from an event stream and updating a record in a datastore, if idempotency is not considered as part of the development process, unexpected behaviours will certainly occur.

Implying that each event being consumed will only be processed once in a distributed system is a fallacy. A very simple example can show one of many causes for data inconsistencies.

Lets take a simplified example of storing an event stream's message into a relational database:

while (true) {
    var message = consumeEventMessage();

In this scenario, the database upsert could successfully complete and the subsequent acknowledgement of the message could fail which will result in a retrieval of the same message on the next run. If there is an aggregated field in the datastore, or a computation that occurs on-the-fly, the original data can be corrupted and result in erroneous data stored at rest. 

This wouldn't even be considered an edge case as it can happen more often than not. Network connections are never reliable and can drop out for a myriad of reasons. Building for failure and providing careful consideration for fault tolerance must be a part of any distributed system.

As an example, in a given scenario where a system needs to cater for A/B test campaigns, the typical process would be as follows:
  1. Set up an A/B test campaign to send to 1200 subscribers
  2. Configure email A with a subject A and send it to 100 subscribers
  3. Configure email B with a subject B and send it to 100 subscribers
  4. After an hour, send the remaining 1000 subscribers to the email with the highest open rate
From an event streaming perspective this is relatively straightforward:
publish message { id: 1, email: 'A', subject: 'A', sent: 100, date: '2021-12-01 12:00:00' }
publish message { id: 2, email: 'B', subject: 'B', sent: 100, date: '2021-12-01 12:00:00' }
publish message { id: 1, email: 'A', subject: 'A', sent: 1000, date: '2021-12-01 13:00:00' }
Storing this into a normalised relational database table wouldn't be so difficult either:

id email subject date sent
1 A A 2021-12-01 12:00:00 100
2 B B 2021-12-01 12:00:00 100

Now with the upsert implementation db.upsertRecord(message) we can achieve parity with the total number of subscribers:

id email subject date sent
1 A A 2021-12-01 13:00:00 1100
2 B B 2021-12-01 12:00:00 100

In an ideal distributed world, this would work. But immediately we can see issues. The obvious one is the loss of data overwritten by the upsert, namely the initial sent date of the A/B test for email A. Another concern would be the fact that we seem to be calculating the total for a specific record on-the-fly and updating the sent count. The other issue isn't glaringly obvious unless a failure occurs in acknowledging the message as was mentioned above. If acknowledgeEventMessage(message.Id) is invoked for the third message and fails, we could get the following state after a retry:

id email subject date sent
1 A A 2021-12-01 13:00:00 2100
2 B B 2021-12-01 12:00:00 100

What is worse is that we have no way of undoing this unless we clear the database entirely and replay the entire event stream (or at a checkpoint) which would add a lot of development hours and maintenance overhead to resolve the issue.

In order to make the system idempotent for either the case of failure or a scenario where event streams would be replayed on existing data, the original data needs to be persisted in such a way that would still benefit the normalised state of the relational database table. The following schema can help achieve this:

id email sub date_1 sent_1 date_2 sent_2 computed
1 A A ...12:00 100 ...13:00 1000 1100
2 B B ...12:00 100 (null) (null) 100

A change in table structure can help alleviate the problems of duplicate messages being consumed. It also enables persisting original data in order to avoid data loss. The unfortunate side effect however is that some records will not utilize all the data fields and will inevitably contain null values and that could be a deal breaker for the purists. Perhaps even normalizing the database table further into two separate tables with a one-to-many relationship would suffice, but that depends entirely on the volume of records and query efficiency surrounding the total sent count.

Moreover, events could be consumed out of order because there is no guarantee of order unless messages are consumed from the same partition. The above solution would still cater for this anomaly if sent_1 was 1000 and sent_2 was 100, and therefore our idempotent implementation is fulfilled.