·11 min read

Linearizable Distributed Map On Kafka

Sancar KoyunluSancar KoyunluSenior Software Engineer @Upstash

Implementing a distributed system is hard. There are lots of algorithms just to be able to make all nodes come to the same conclusion. They are called consensus algorithms, Raft, and Paxos being the most known ones. In this blog post, we will try to see if we can offload the consensus to a third party and get away with a much more simpler implementation. We will try to implement a linearizable KV store on top of Kafka mentioning potential pitfalls and together with referencing the existing literature.

Why do we need a KV store?

While developing SchemaRegistry we needed to have a KV store to the schemas. The requirements were:

  1. It should be fault-tolerant. If a node dies, the schema registry should continue to work.
  2. It shouldn't cause us to maintain another service creating another failure point.
  3. It should be strongly consistent or even linearizable if possible.
  4. It doesn't need to be performant. The schema registry is something rarely updated and rarely read(because clients cache the schemas locally)

Because of item 2, we decided to use Kafka itself as our storage but use it as a replicated KV store. Then there are a couple of questions to be answered. One is a log like Kafka enough for this? And second how to make it consistent enough to be used as a Schema Registry. While searching for answers, we learned a lot. Even though we made different decisions and tackled many more problems than mentioned here during our implementation of the Schema Registry, we would like to mention how can one implement a linearizable KV store.

So the question is can we turn Kafka into a replicated linearizable KV store?

Reference

We will follow the Designing Data-Intensive Applications from Martin Kleppmann heavily in this blog. From here on when I say the book I refer to this one. All of the mentions are from Chapter 9 Consistency and Consensus. Section, Total Order Broadcast.

Can we convert Kafka to a replicated linearizable KV store?

In the book, it is mentioned linearizable storage and total order broadcast have a strong connection between them and one can be implemented with another *1. And it is strongly related to what we are trying to achieve here. linearizable storage -> KV Store total order broadcast -> Kafka

Let's double-check the definition total order broadcast first to see if we the right tool.

Total order broadcast requires two things *1:

  1. Reliable delivery: No messages are lost. If a message is delivered to one node the other nodes should also get it.
  2. Total ordered delivery: Messages are delivered to every node in the same order.

So the answer seems to be yes. We can convert Kafka into a replicated linearizable KV store. But how?

How to turn Kafka into a replicated linearizable KV store?

Linearizability in simplest terms means to make a system appear as if there is only a single copy of the data. In other words, it means that if you have a map.putIfAbsent method, it should work as if it is a basic in-memory hash map. Or if you put something to the map, all the subsequent readers should be able to see it as it happens in the normal hash map. Let's look closely at how can we achieve linearizable writes and reads separately.

Linearizable Writes

The book mentions that you can implement a linearizable compare-and-set. The data structure it mentions is a simple one. A register where it is initially null. It can only be set once. After it is set, new writes will fail. The solution to that is as follows in the book:

  1. Append a message to the log as a write attempt, meaning that I want to write something here.
  2. Read the log and wait for the message you appended delivered back to you.
  3. If the first message you see regarding this register is your message, then you are successful. You can return to the client. If the first message belongs to another one, you have failed.

We need to improve upon this idea. We need to be able to set more than once. This algorithm fails in this case.

To improve upon this, we can introduce a version to be stored alongside the append. The version will start from 0 and it will be incremental. Let's see how a write will take place:

  1. Read the last known version you have belonging to the key. Let's say it was 3.
  2. Set your version for the write attempt as one plus the last known version(4)
  3. Put your write attempt into the log.
  4. Read the log and wait for the message you appended delivered back to you.
  5. If the first message you see with version 4 is your message, then you are successful in setting the value. If you see a different message with version 4 you have failed to set it.

Depending on the API, you give you can retry this whole operation to attempt the next write, or return a failure to the client immediately.

To implement a map.putIfAbsent, returning after failure is enough. map.put with this algorithm is almost the same. We can return to the client after 3rd step.

Let's see how this is translated to do the code. Some locking is ignored for brevity. We will give an example for putIfAbsent only. The rest can be found here. The code is explained in the comments below.

// Relevant class fields //
HashMap<String,String> data = new HashMap();
TotalOrderBroadcast totalOrderBroadcast;
 
/* PutIfAbsent */
 
// Ignore the first function, for now, we will get into 
// detail of it in the next section. 
// Just be aware that, it needs to be there. 
linearizableRead();
 
// We read the local value first and if it is not null,
//We should return according to the putIfAbsent contract
VersionedValue existingVal = data.get(key);
if (existingVal != null && existingVal.value != null) {
	return existingVal.value;
}
 
// Decide what the next version is via incrementing it.
int nextVersion;
if (existingVal == null) {
	nextVersion = 1;
} else {
	nextVersion = existingVal.version + 1;
}
 
CompletableFuture<VersionedValue> f = new CompletableFuture<>();
registerToWaitFirstMessage(f, key, nextVersion);
 
// broadcast the write attempt to Kafka log with the value and version
totalOrderBroadcastOffer(key, value, nextVersion);
 
//Here we wait for the first message to arrive
VersionedValue firstMessageBack = f.join();
// if the first message we get back is ours. 
// Then putIfAbsent is successful.
if (value.equals(firstMessageBack.value)) {
	return null;
}
// otherwise, there was another set/remove, putIfAbsent failed.
return firstMessageBack.value;
// Relevant class fields //
HashMap<String,String> data = new HashMap();
TotalOrderBroadcast totalOrderBroadcast;
 
/* PutIfAbsent */
 
// Ignore the first function, for now, we will get into 
// detail of it in the next section. 
// Just be aware that, it needs to be there. 
linearizableRead();
 
// We read the local value first and if it is not null,
//We should return according to the putIfAbsent contract
VersionedValue existingVal = data.get(key);
if (existingVal != null && existingVal.value != null) {
	return existingVal.value;
}
 
// Decide what the next version is via incrementing it.
int nextVersion;
if (existingVal == null) {
	nextVersion = 1;
} else {
	nextVersion = existingVal.version + 1;
}
 
CompletableFuture<VersionedValue> f = new CompletableFuture<>();
registerToWaitFirstMessage(f, key, nextVersion);
 
// broadcast the write attempt to Kafka log with the value and version
totalOrderBroadcastOffer(key, value, nextVersion);
 
//Here we wait for the first message to arrive
VersionedValue firstMessageBack = f.join();
// if the first message we get back is ours. 
// Then putIfAbsent is successful.
if (value.equals(firstMessageBack.value)) {
	return null;
}
// otherwise, there was another set/remove, putIfAbsent failed.
return firstMessageBack.value;

There is a continuous polling thread that reads the log and populates the data map that we used above and also notifies the future f when the first message arrives. For each polled message, it does the following:

//Decide on the existing version
VersionedValue old = data.get(key);
int existingVersion = 0;
if (old != null) {
	existingVersion = old.version;
}
 
var newValue = new VersionedValue(version, value);
// If the version is bigger, we update the key. 
// This means only the first update for 
// this version is effective.
// The next ones will be ignored since they are bigger(but equal)
if (key.version() > existingVersion) {
	data.put(key, newValue);
}
 
// Wake up all the ones waiting for the first update to key and version.
var futures = waitMap.remove(key + version);
if (futures != null) {
	futures.forEach(f -> f.complete(newValue));
}
 
// Relevant for the linearizable reads
// We will get back to this.
lastUpdateBarrier.setIfBigger(message.offset());
//Decide on the existing version
VersionedValue old = data.get(key);
int existingVersion = 0;
if (old != null) {
	existingVersion = old.version;
}
 
var newValue = new VersionedValue(version, value);
// If the version is bigger, we update the key. 
// This means only the first update for 
// this version is effective.
// The next ones will be ignored since they are bigger(but equal)
if (key.version() > existingVersion) {
	data.put(key, newValue);
}
 
// Wake up all the ones waiting for the first update to key and version.
var futures = waitMap.remove(key + version);
if (futures != null) {
	futures.forEach(f -> f.complete(newValue));
}
 
// Relevant for the linearizable reads
// We will get back to this.
lastUpdateBarrier.setIfBigger(message.offset());

Linearizable Reads

When you are serving the reads, if we do not take any precautions and serve whatever we have locally on reads, the clients can read the stale data. This is because we do not wait for any of the other nodes to consume the log. Moreover, we can't, because we don't know how many readers are there.

Figure 1

In the figure 1 above, the users puts a value to the node 1. When it asks it from node 1, it can see the update because we have waited for the message to come back to node 1 on write. But when it asks the second node, it can not see the update it did because it hasn't consumed the latest message from the log yet.

To make sure the reads are linearizable, we have a couple of options.

  1. Upon a read request, append a dummy message to the log, and read the log until you see the dummy message. Only then, serve the request. Our choice will be this.
  2. If you can read the current position of the log in a linearizable way, then you can wait until that position, and only then serve the request. This is not possible in our case because of the Kafka API.
  3. Read from a replica that is synchronously updated on writes. Meaning that a write is done both to log and this replica always. If we follow this path, we need to designate a node to do the writes and read from it. This requires more coordination and, therefore more complicated for our taste.

The code for linearizable reads is simpler:

/* linearizableRead */
 
// This is a dummy key that we will put into the same log
String jsonKey = toJson(new Records.WaitKey("read"));
 
totalOrderBroadcastOffer(jsonKey, value, nextVersion);
// wait for the message we sent to get back
// lastUpdateBarrier is updated on the polling thread. 
// And this call waits for the polling thread to reach offset.
lastUpdateBarrier.await(offset);
/* linearizableRead */
 
// This is a dummy key that we will put into the same log
String jsonKey = toJson(new Records.WaitKey("read"));
 
totalOrderBroadcastOffer(jsonKey, value, nextVersion);
// wait for the message we sent to get back
// lastUpdateBarrier is updated on the polling thread. 
// And this call waits for the polling thread to reach offset.
lastUpdateBarrier.await(offset);

Any read operation should call linearizableRead and do the local operation. Some examples:

public String get(Object key) {
	linearizableRead();
 
	VersionedValue existingVal = data.get(key);
	if (existingVal == null) {
		return null;
	}
	return existingVal.value;
}
 
public int size() {
	linearizableRead();
	return data.size();
}
public String get(Object key) {
	linearizableRead();
 
	VersionedValue existingVal = data.get(key);
	if (existingVal == null) {
		return null;
	}
	return existingVal.value;
}
 
public int size() {
	linearizableRead();
	return data.size();
}

CAP Theorem

Before we conclude the blog, we can not skip mentioning a CAP Theorem in a distributed system blog. CAP(Consistency Availability Partition) Theorem states that, if you have the possibility of partitioning in a system, the system is either Consistent or Available, not both at the same time. We have developed a linearizable map, meaning a Consistent system. The downside is in case we can not reach Kafka because of a problem, the system can not respond anymore. It has to wait for the communication to get back to healthy.

Miscellaneous

We have solved linearizable read and write but there are still some problems that we need to solve.

Infinite log.

A new starting node must read the whole story before being able to start serving. If you deploy this map and let it run for days, it will be a long queue. And starting time of new nodes will not be feasible. To solve this, the Log Compaction feature of Kafka comes to the rescue.

Log Compaction is a way to retain only the latest value of a key in a log. The older ones will be cleaned up by the Kafka eventually.

We still need to be careful here. Our algorithm relies on the fact that there can be multiple versions of the same key in the log. If the first write attempt of a version gets compacted, the different nodes can decide on a different final value for the same key. To mitigate this, we will use a config of kafka min.compaction.lag.ms. With this, we can configure the minimum time a message will remain uncompacted in the log. We can keep it long enough so that any concurrent writes(within the same version) settle. Even a value like 10 minutes should be more than enough for this, but let's make it 1 hour to be on the safe side.

Conclusion

We have implemented a distributed KV store on top of Kafka. You can see the full source code, implementing a basic java.util.concurrent.ConcurrentMap interface here.

Here are some ideas for you our readers to tackle themselves and at the same time we might write a blog about.

  • Can we implement other data structures from Java like Queue, List, Lock, AtomicInteger, etc using similar logic?
  • How it would look like if we aim for a weaker consistency guarantee like Sequential Consistency, Eventual Consistency, etc?

Stay tuned and don't forget to give feedback at our repo here for potential bugs or improvements.