This is really cool as a demonstration of how distributed systems can be built on top of Durable Objects. In particular I was really happy to see this:
> What about replication? What about leadership election?
> There is none, at least not in the application code. The point of using Durable Objects here is that we can offload these complexities onto the infrastructure layer, and keep our application code focused and minimal.
Yes, exactly. With Durable Objects, these common problems are offloaded to the underlying system, and you can instead focus on the higher-level distributed systems algorithms problems that are unique to the particular infrastructure you want to build.
(I'm the tech lead for Cloudflare Workers and helped design Durable Objects.)
So every topic has a (Kafka) replication factor of 1, but how do producers find their partition leader if the DO fails? From my understanding, all the DOs are effectively the same broker so they'd never lose the leader?
With regards to Kafka 0.8, I inferred it was due to the protocol being simpler back then, what issues did you hit with newer protocol versions?
Producers (and all clients) can only see and interact with the gateway worker. DOs aren’t accessible from the outside world, and all communication with them happens through the gateway worker. In the case of DO failure, it’s the gateway worker’s job to reconnect with the failed DO. Depending on the nature of the failure, this reconnection request might be routed to a DO instance located on a different machine from the original, failed instance (in which case a leadership change happened internally). From the client perspective, nothing changes, and it can keep using the same gateway worker connection as before.
Regarding 0.8, that’s right, I wanted to start with something as simple as possible. This project wasn’t intended for serious, practical usage, so spending time on extra features added in later versions didn’t seem useful. I’d imagine that newer protocol versions could be implemented without too much difficulty, but whether this thing is fast enough to ever be practical is another larger question.
I feel like this is a dumb question. But how exactly does Durable Objects help you avoid replication and leadership election? I've read the high level overview of Durable Objects, but sadly, the answer isn't clicking for me...
Parallel requests can be served, but they are all through a single instance and the abstraction gives you effectively exclusive access to the data store while the stateful part of a request is being processed.
This does imply a hard limit on transactions per second on a particular object, since those transactions must be serialized in a thread. But, this is true of all databases: there is a limit on transactions per second operating on the same "chunk" of the database, since those transactions by definition have to be serialized.
In fact, you may find a traditional database behaves much worse when everyone is trying to modify the same row: if many clients do a read-modify-write at the same time, all but one of them will fail with transaction conflicts and will have to start over. If you aren't careful this can easily be O(n^2) total work done.
With Durable Objects, because your application code itself runs serialized in the object thread with zero storage latency (assuming cache), it's easier to make each operation "atomic" and avoid transaction conflicts entirely.
Thank you lord Kenton ^_^ I’m honestly looking forward to Cloudflare queues which is probably just built on top of DOs but its sooo nice having it as a service
I once made the NodeJS Kafka client KafkaJS run in a browser with a websocket-to-tcp-socket shim. If we combine the two we can now have a browser based client talk to a broker cluster running on edge workers. Just gotta integrate it with Google Sheets and wait for the VC money.
Yep, one of the challenges here is that KafkaJS requires Kafka 0.10+, which is true for several other popular clients. For this broker implementation to be somewhat practical, it would need to be extended to 0.10.0 at least. 0.8.0 was a nice, simple starting point for a “proof of concept” project like this.
The one feature I miss the most on Cloudflare Workers/Durable Object is TrueTime.
Durable Objects are fundamentally replicated state machines with a nice JavaScript API. You can build an automatically sharded (yet correct) Kafka API implementation, or even an entire Spanner-style distributed database on Workers, given the right primitives (DO + TrueTime).
DOs aren't distributed, they're guaranteed to be a single instance of any DO running in exactly one data center, running ~single threaded. So TrueTime doesn't seem helpful, since Date.now() in the worker will always just be one time. TrueTime would offer you a primitive to do the work that Cloudflare gives you for free by virtue of using their infrastructure: would you rather have the primitive and do it yourself? Or would you rather just have the system do that for you so you don't need to think about coordinating a distributed system?
TrueTime is an efficient primitive to ensure external consistency across multiple consensus groups, not within one group. The current DO infrastructure does not give the TrueTime guarantees for free: you cannot do transactions across two or more durable objects, and the max throughput within a transaction domain is limited to what a single V8 isolate can handle sequentially.
DOs are arguably the wrong primitive if you're looking to do transactions across more than one of them. They're essentially partitions, which is how this project uses them. If you do transactions across more than one, you are still making blocking, synchronous requests across DO instances which can be across N data centers. You can't really implement locking for blocking writes if you want to implement transactions at a layer above DOs: short of implementing a spin lock (very expensive with workers), you don't have a way to wait for an ongoing transaction to complete. Which is to say, if you had a way to implement transactions with TrueTime in Workers, you could just use the standard KV store and avoid DOs entirely, no? The great part about workers is that you don't pay for idle time—if you're implementing your own locking, you're not able to yield the event loop back to CF. At that point, you've lost most of the benefit of being at the edge (you're making blocking cross-DC requests) and most of the cost benefits of Workers.
You can implement synchronization across DOs, but the application has to coordinate it. E.g. you could implement two-phase commit to get strongly consistent semantics (with the possibility of temporary unavailability if the network between the objects is partitioned at the wrong moment). Or if your application can tolerate eventual consistency, consider using CRDTs for partition tolerance.
These things require some care to get right, but not a huge amount of actual work, compared to the challenge of implementing lower-level consensus and replication that DO takes care of for you.
Yep, all the diagrams were created with excalidraw. The exported “source” for the diagrams is stored in the diagrams.excalidraw file in the root of the repository.
I thought about that while back, how would I build an index on top of Durable Objects?
For sorted indexes like a b-tree in a database, I think you would partition into objects by value, so (extremely naive example) values starting with a-m would be in one object, and n-z in the second. You'd end up needing a metadata object to track the partitions, and some reasonably complicated ability to grow and split partitions as you add more data, but this is a relatively mature and well-researched problem space, databases do this for their indexes.
For full text search, particularly if you want to combine terms, you might have to partition by document, though. So you'd have N durable objects which comprise the full text "index", and each would contain 1/N of the documents you're indexing, and you'd build the full text index in each of those. If you searched for docs containing the words "elasticsearch" and "please" you would have to fan out to all the partitions and then aggregate responses.
You could go the other way, and partition by value again, but that makes ANDs (for example) more challenging, those would have to happen at response aggregation time in some way.
You'd do the stemming at index time and at search time, like Solr does.
I have no idea what the documents per partition would be; it would probably depend on the size of the documents, and the number of documents, and the amount you'll be searching them, since each durable object is single-threaded. Adding right truncation or left+right will blow up the index size, so that would probably drive up the partition count. You might be better off doing trigrams or something like it at that point but I'm not as familiar with those.
This is where optimizing would be hard. I don't think you can get from Durable Objects the kind of detailed CPU/disk IO stats you really need to optimize this kind of search engine data structure.
You’re better off creating Lucene Segments in R2 and letting Lucene remotely access them (if Lucene could run on Worker as WASM). Or something very like Lucene but compiled to WASM.
You’d also need to manage the Lucene Segments or Solar/ElasticSearch Shard Metadata in Workers KV. You’d need a pool of Workers that are Coordination Nodes, another pool as “Data Nodes / Shards” and a non-Workers pool creating and uploading Lucene segments to R2.
It shouldn’t be so hard to do actually. Cloudflare would need more granular knobs for customers to fine tune the R2 replication to be collocated with the Worker execution locations so it’s really fast).
> What about replication? What about leadership election?
> There is none, at least not in the application code. The point of using Durable Objects here is that we can offload these complexities onto the infrastructure layer, and keep our application code focused and minimal.
Yes, exactly. With Durable Objects, these common problems are offloaded to the underlying system, and you can instead focus on the higher-level distributed systems algorithms problems that are unique to the particular infrastructure you want to build.
(I'm the tech lead for Cloudflare Workers and helped design Durable Objects.)