We use Kafka a lot at Monzo. So much so that we’ve built our own queueing abstraction on top of Kafka.
Most of the time we don’t need consumption to be ordered at Monzo so we are able to achieve high throughput using our “unordered concurrent” implementation without too many Kafka partitions. Yet ironically, some of our biggest batch processes do need ordering or at least mutual exclusion over some key and so can’t use any of this. They continue to be more painful to scale and maintain.
We built our “unordered concurrent” subscription with this in mind, knowing that we could eventually extend it to support ordering as Monzo needs it. This post is about how that works. Before we dive in, let's take a closer look at the problem that we’re trying to solve.
These processes can only scale horizontally and take distributed locks
Some of Monzo’s biggest batch processes are related to payments. This means that they generally scale both with the number of customers and the number of payments each customer does. At minimum this is linearly but often super-linearly in reality. These batch processes need ordering, mutual exclusion over a key or both because it is important to the correctness of the system and/or for customer experience.
For example, in the world of standing orders (also known as scheduled payments) we want to process external credits to an account (like your salary via Direct Credit) before running anything that debits your account. This requires ordering. Other systems, like interest accrual pipelines may not require ordering but do need mutual exclusion. Having a good scaling story behind these systems is important.
So how have these things worked historically? Let's start with the mutual exclusion case. There are broadly two ways for a batch process to achieve this:
Consume messages in order, one at a time, making sure the key you want mutual exclusion over is your partition key. The key downside here is that you can only scale out by adding more Kafka partitions. All the downsides of my original post apply here.
Use our “unordered concurrent” subscription mode but take a distributed lock to give you the mutual exclusion. Distributed locks are extremely expensive so in many ways this solution isn’t ideal even if possible at the limit.
Given that one approach to solving the mutual exclusion problem is to use ordering, we can instead focus this post on just how we can scale ordered consumption. One simple way to scale out horizontally is by adding Kafka partitions. So let's talk about that and why it isn’t quite good enough for us.
Adding more Kafka partitions horizontally comes with many downsides
While simple on the surface there are actually a lot of sharp edges to consider for Monzo when it comes to "just adding more partitions".
The act of adding more partitions comes with its own complexity. In particular, you need to decide how many partitions to add (surprisingly non-trivial) and run the process of doing so in the right order. By default, consumers at Monzo consume a topic from
offsetNewest
for any partitions where there is no committed offset stored. So you must either switch this tooffsetOldest
while adding partitions (and don’t forget to swap it back after) or ensure you get the consumers to rebalance before the publishers refresh their metadata and produce to the new partitions. If you don’t you’ll end up skipping some of your messages as your consumers jump to the newest offset, skipping existing messages.Partitions aren’t expensive but they also aren’t free. Each partition adds metadata to track to the Kafka cluster and there is a clear associated CPU cost. In addition, more partitions also makes rebalances more expensive from the consumer side. We’re interested in a solution that is generally applicable, that we can easily recommend to all teams without it becoming problematic as more teams do it.
You can’t remove partitions, adding them is a one-way street.
It biases you towards running more, smaller pods for the consumers where actually we often prefer to run fewer, larger pods (for similar overhead reasons, it’s nicer on our Kubernetes clusters).
Similar to our motivation to introduce “unordered concurrent,” this solution doesn’t address the standing backlog problem. Once the messages have been produced you have no way to speed up consumption for those. So an unexpected backlog is just something you have to wait out, something we want to avoid as many jobs have strict SLOs against them.
These sharp edges are substantial and we thought we could do better by extending our “unordered concurrent” subscription.
We added per-key ordering to our unordered concurrent subscription
The first implementation of our “unordered subscription” was aimed at consuming multiple messages concurrently within a single Kafka partition, with the offset only moving when all messages up to that offset have been successfully processed. It ignores partition keys entirely. What if we could continue to consume messages with different partition keys concurrently, but maintain strict ordering for messages with the same partition key.
If we put new messages in flight one at a time, in order of the messages in the log and only if no other message is already in flight with the same partition key - that would give us ordering within a partition key but concurrency across partition keys. If we can do that, that would give us ordered consumption with all the vertical scaling properties of our “unordered concurrent” subscription. That would solve for all the key issues we were seeing with the horizontal-only approach.
In order for this to work, there are a few things we need to do:
We need to make sure that messages are produced with the partition key over which we want ordering/mutual exclusion. This is to ensure that messages for the same key end up on the same Kafka partition. For a use case that already wanted ordering this isn’t new, for a use case that previously only wanted mutual exclusion it might be.
We need to ensure that each partition is only being consumed once. Out-of-the-box Kafka doesn’t quite guarantee this as an old generation consumer that hasn’t stopped in time (or tried to commit offsets) could be consuming in parallel with a newer generation consumer. A rebalance could trigger this for example. So strictly, for perfect ordering on a vanilla ordered subscription you also need to do something to guarantee this. We solved for this at Monzo by taking a long lived distributed lock over a partition before accepting any messages for processing. Fun fact: this is a feature we also let folks use independently and can be used with a vanilla subscription though with the functionality this blog post describes we hope we can remove that use case.
We need to ensure that messages with the same partition key are processed serially. Our naive solution for mutual exclusion used distributed locks in the business logic, as even messages on the same partition could race according to the point above. Instead, we can now use a local lock (essentially a named mutex) to decide if a message for a given key can be put into flight as the library has stronger guarantees before handing the message off to a consumer (and we know all messages for a key are on the same partition). This is a huge efficiency gain, as local locks are way faster and magnitudes cheaper than their distributed counterparts.
Now you can see how this approach combines the two naive approaches. Sequencing messages serially by using a local lock to determine if we can pick up a message gives us both ordering and mutual exclusion with the same solution.
With all that in mind, we can now illustrate how this works in practice.
In our example we have four concurrent goroutines attempting to process messages. The first three have successfully picked up messages with partition keys acc_123
, acc_456
and acc_789
respectively. The fourth one is attempting to pick up the message at offset 5, but because it has partition key acc_123
for which we already have a message in flight (the key is locked) it is unable to pick up that message until offset 2 is committed (and thus the key unlocked).
As long as there are no collisions on keys, the worker pool of goroutines can continue to make progress, picking up messages well ahead of one another. In the example below message 5 may be slow but because no other message has the partition key acc_123
the other three goroutines can keep going. Since this is built on top of Monzo’s standard “unordered concurrent” subscription, the goroutines will continue to respect the “max runaway” configured so they can’t run too far away from the committed offset.
One shortcoming of this solution is that waiting for a single key to unlock will block all goroutines from picking up more work. This is on purpose. We wanted to start with a naive solution. Tracking all the messages that are waiting to be picked up but have a locked key in a way that maintains ordering is feasible but very complex compared to the naive approach. We are comfortable with this as a first implementation as we generally expect few of the same keys to arrive within a short window of each other at Monzo. And if it turns out to be a real bottleneck, we can always solve this problem entirely inside the library.
One other thing to note is that because this is built on top of “unordered concurrency” which requires having a deadletter, this new mode also requires a deadletter. For Monzo this is the right tradeoff. We do care about ordering in some scenarios, but would rather sacrifice it on a case-by-case basis (for a bug) than stall an entire ordered queue until a human can look at it. Human effort is required in both cases, making this a much safer default.
Our generated code made this trivial to expose
In the previous post I discussed the benefit of our generated code. Now I can really bring that to life. Using this new feature is as simple as adding a new generated option, WithPartitionKeySerialization
, onto a subscription that is already using WithUnorderedConcurrency
. When you do this you implicitly opt into taking long lived distributed locks over partition keys as briefly mentioned above as well.
This was probably the single hardest feature to name for our Kafka library. Some at Monzo have taken to calling it “ordered unordered” which kind of makes sense but is very confusing if you don’t already know what it is. That’s why I settled on “partition key serialization”. An attempt to describe literally what it does without it being a full sentence. Curious to hear if folks agree.
In the future, this could be the default
If a message without a partition key is consumed by this subscription it defaults back to the default “unordered concurrent” with no key serialization. This presents us with a great option for the future - making the subscription represented in the code snippet above the default. Then it could just be NewSubscriptionV2
with no extra options. Virtually all subscriptions at Monzo already use deadletters and the ones that don’t likely should. Many use “unordered concurrent” and again, most that don’t should (as it’s somewhat newer). Ordering and cheap mutual exclusion were the remaining reasons not to, but now that is solved. So we can give everyone the same subscriptions as default, the most powerful one. Then if you want ordering, you set a partition key on your messages. If you don’t want it, you leave it blank.
This would massively reduce the cognitive overhead for engineers, as we chose the most sensible default for you (unlike today where the default actually has the most sharp edges). If someone really can’t use it, we can always generate options to remove features instead, which should be extremely rare. With this we can also vastly reduce the number of distributed locks required in our platform. This is a big deal for Monzo.
I’m personally really excited to see where this library will go. So far it has been a journey of adding features and complexity to the benefit of our engineers. And now we are on the cusp of something even more exciting, being able to hide most of that complexity again and shipping something easier, safer and more delightful than ever before.
Does running building reliable systems and infrastructure at scale and without downtime interest you? We are hiring for Backend Engineers and Staff Engineers