Hacker News new | past | comments | ask | show | jobs | submit login
Waltz: A Distributed Write-Ahead Log (wepay.com)
172 points by riccomini 13 days ago | hide | past | web | favorite | 44 comments

This design seems to be an example of a deterministic database system. There's an excellent review of deterministic databases here: http://www.cs.umd.edu/~abadi/papers/abadi-cacm2018.pdf

The core concept of all deterministic databases is simple: if your database is deterministic, multiple geographically-distributed replicas can execute the same transaction log independently, and they will all reach the same state. The problem of implementing a distributed database is reduced to implementing a distributed log.

However, there is one oddity of the Waltz design: the central locking mechanism. Other deterministic databases don't have anything like this, because it's not necessary. You can keep track of locks locally on each replica, and you can rely on every replica to reach the same conclusions about which transactions succeeded and failed, because they are deterministic.

Can anyone clarify why they are managing centralized locks?

Essentially if each instance can keep track of validating transactions locally, the whole thing can be easily implemented with Bookkeeper and distributed log API [0]. Bookeeper guarantees single writer (the master node which is appending to log) for each ledger. All it needs to be done is to put concurrency control metadata (like version, timestamp, ...) inside the message so each node can validate stuff locally.

[0] https://bookkeeper.apache.org/docs/4.9.2/api/distributedlog-...

Daniel Abadi also has an excellent paper on column databases: http://db.csail.mit.edu/pubs/abadi-column-stores.pdf He's a great person at giving high level overviews of database systems before digging into the implementations.

The blog post didn't fully specify, but the source code suggests that the locks are tracked per-partition rather than managed centrally.

I have not gone into this at all, but my initial thought is that as long as every log entry also have a unique id of the last log entry at the time of it's writing, you can always piece together a deterministic log without any locking or voting. Just share everything you know.

This design can be extended to support interactive transactions.

Here's how:

1. Assign each transaction a log position. For strict serializability instead of just serializability, make this the highest log position ever acknowledged successfully to a client. This can be batched for throughput.

2. Have each client record their read set and write set, which includes the objects/ranges the transaction read and the writes / operations to be performed.

3. Have clients persist this R/W set into the log, or send directly to the lock server if it is also the master assigning log positions. Again, use batching for throughput.

4. Have your lock server either as a part of the master processing assigning log positions, or have it tail the log separately. The lock server will receive batches of transactions, take locks in the log's defined order, then commit another entry to the log with the commit/abort decision for each.

5. Respond to the client with the commit / abort decision.

To make this easier to program you'll probably want to include a read-your-writes cache on the client.

You can also scale out this log by having clients commit their operations to multiple logs. The only thing that needs to be serialized is the locking and commit / abort decision making. These other logs can be sharded by key range or just chosen randomly, as long as the commit / abort decision log includes which other log the data itself was committed to.

FoundationDB works roughly like this. The terminology is different (resolver instead of lock server, R/W set is conflict ranges, log position is version number, etc) but this is basically how it works.

Persisting executed transactions to a log with their read/write sets, and then determining if they actually commit also starts to sound like http://web.eecs.umich.edu/~michjc/eecs584/Papers/cidr11_hyde... to me

Yes, it is more similar to Hyder than FoundationDB if you persist the R/W sets to the log itself. FoundationDB gets around this by only keeping the lock table in memory. When any process in the transaction pipeline fails, the master (which holds the version number) advances the version far enough into the future that any in-progress transaction would fail because it is too old.

I have this nasty feeling that for the scale mentioned the whole logic can be safely run on a single high performance server with the decent database. Those little servers around can either run on the same server or placed on separate server/s around the main one. All nice and simple and no distributed transaction problems.

I think Waltz is designed for availability, rather than performance.

For a company like Wepay, maybe a couple minutes of downtime is incredibly expensive and having a single server just isn't going to cut it.

Maybe it is "incredibly expensive", can't argue with this. Have hot standby database servers then located elsewhere. Still way simpler

>Have hot standby database servers then located elsewhere

They explained why this wasn't ideal for them in the article.

Last I heard from some friends in YC when considering a position there, Wepay was handling less than 10^6 payments per day. Is that still the case and is something with such low requirements a good replacement for Kafka in the wild?

Your comment made me realize that I had confused WePay with WeChat Pay, which has a slightly different scale. The "We$VERB" field is getting a bit crowded.

Looks like this also uses Zookeeper. Does anyone know of a simple streaming log system / database? Like, SQLite3 for streaming? I'm using this for personal projects more and more, and the solutions I see in this space are always big, distributed and hard to setup and keep running.

I've been using a simple file format that just writes each message out sequentially in a simple format like: [event-id][event-type][event-size][event-bytes]. And there's a small TCP server that speaks Redis protocol to support remote access. But it's not really production code, rather something I hacked together over a couple weeks in the evening.


This is a project of mine I use for some side projects for exactly that reason: I want streaming logs & pubsub semantics. It's basically Kafka xtra-lite. It doesn't have a super simple TCP protocol (though I wouldn't say no to a PR adding one!), it's dual-available over gRPC and HTTP (JSON API).

Disclaimer: It's definitely one of my low-activity side projects and subject to change at any time.

If you're already using the Redis protocol then Redis v5 has Streams as a first-class data structure.

It's fast, has consumer groups (like Kafka), and also supports individual message acknowledgement.

Is Redis Streams backed by disk? How does it perform when the size of the stream exceeds available RAM?

Also, what's Redis' multi-node failover/high availability story these days (with streams)? Last I heard, it wasn't that great [1], but it's been a while.

[1] https://aphyr.com/posts/283-jepsen-redis

Redis is an in-memory data store that has different options for persistence (snapshots + oplog) but it's not designed to persist every operation immediately. All data structures are covered including Streams.

Redis keeps the entire working set in RAM so it'll start dropping writes or freeze if you run out of memory. This is where the simplicity and speed comes from and is a fundamental limitation.

There's a simple replica system that works well but failover switching is the problem and requires a separate process or running the Redis Sentinel. There's also Redis Cluster but that just shards the keyspace and doesn't offer any scalability with a single key or stream, and is still hard to manage with the same failover issues.

The OP asked for a single-node option so I suggested Redis, if you need a serious messaging cluster then I recommend Kafka or Pulsar instead.

Thanks. That confirms pretty much the picture I had in my head of how Redis works these days.

It's frustrating that there's no obvious middle ground this and Kafka and Pulsar, both of which are memory-hungry JVM apps with multiple external dependencies. Both require ZooKeeper; Pulsar also requires BookKeeper. None of these components are operationally simple.

I'm a fan of NATS itself, but NATS Streaming's clustering design leaves a lot to be desired. In particular, it punts on failover/HA, asking you to instead run an SQL database or shared file system that provides this. (An obvious low-maintenance option here would be CockroachDB, but NATS doesn't support it.)

There are other options. RabbitMQ [1] is the usual although clustering has always been fragile.

If you dont care about open-source then there are plenty of other options like AMPS [2] or Solace [3]. The latter has a free edition.

1. https://www.rabbitmq.com/

2. http://www.crankuptheamps.com/

3. https://solace.com/

I wouldn't put RabbitMQ in this category — it is a classical message broker, not a log. Once you've consumed a message, it's gone, unless you have set it up so ACKed messages are funneled into another queue, but that stuff is finicky and doesn't patch over the fact that underneath it's designed for mutable, ephemeral queues. In particular, you can't peek back into the queue to find older items. You have zero visibility into the contents of the queuem, and you certainly can't treat it as a dependable database.

And, as you say, fragile. I've run RMQ in production for years and I would be very happy if I could throw it out. It's the least well-behaved component in any stack I've used it in. Even Elasticsearch (shudder) is better at not losing data. Not just the clustering, either. Even for a persistent queue, RMQ will start to chug RAM for any message that is delivered but not yet ACKed, for example, making it dangerous for apps that want to batch large groups of messages for efficiency. (It seems to me that it was not designed for that at all, but for one-by-one consumption, which is of course much slower.)

I'm looking for a mature distributed log that is clustered and lightweight. Kafka except, say, written in Go.

Try out Solace then. It's fast and supports HA for the free edition with throughput limits.

Note that NATS on it's own is just a pub/sub system and doesn't have any queuing or persistence. There is no "log". You need the NATS Streaming server for that which implements persistence while communicating over NATS.

I was looking into https://github.com/liftbridge-io/liftbridge. I believe he was a maintainer of NATS streaming and wanted to make something lightweight and Kafka-esque.

Liftbridge looks cool but right now you need use their protobuf defs and cook your own client lib if you want bindings outside of go.

Liftbridge is an experiment and has no real production users. NATS Streaming is already fine for lightweight single-host usage. If you really need to scale to multiple servers then I recommend skipping NATS Streaming and going straight to Kafka or Apache Pulsar.

Have any more lighter-weight recommendations that fit the queuing and persistence category?

RabbitMQ is another option. Clustering is fragile, but it's fine if you just need a single-node: https://www.rabbitmq.com/

There are also proprietary systems like Solace: https://solace.com/

Other than that, there are embedded queuing/log libraries or you can just dump the messages into a key/value store or write them out to a file yourself.

Maybe the pub-sub pattern of https://zeromq.org/ could work for that

Kafka is moving towards eliminating the ZK dependency, while will make it easier to run as a single node if you don't care about HA.

Hey, is there a tracker somewhere for this? I've always been into trying Kafka, but I heard horror stories about ZK so having it removed would be nice.

Just curious, have you looked into Redis streams?

Redis streams?

Here's a dumb question about log-structured systems like this: does this system work nicely with backfills? Suppose you start logging events with Waltz and you want to migrate an existing system's data into the same log. Or something goes wrong and oncall needs to manually insert old events. Does Waltz have capabilities to backfill events into the historical log or reassign transaction IDs?

This might not be needed if this is strictly used for FIFO event consumption, but I guess I was thinking of trying to make a system like this support time-sliced queries.

These are good reasons to use a bitemporal database [1].

A log gives you "transaction time" but you need to create an efficient representation of "valid time" for backfilling and corrections.

[1] https://en.wikipedia.org/wiki/Temporal_database

Disclosure: I work on a database for Kafka that provides point-in-time bitemporal Datalog queries https://github.com/juxt/crux

Yeah, I'm currently getting bitten by various workloads that query by `created_at` instead of a canonical timestamp :) Thanks, I'll take a look!

For the "valid time" primitive I was thinking of implementing something like a hybrid logical clock that CockroachDB has (but with looser guarantees, mostly just need uniqueness and monotonicity). A sequential ID would provide a slightly nicer interface for pagination but has all the problems that I previously mentioned.

What if I have multiple entries to update in one message(ensure atomicity)? Or write based on multiple reads? One lock id cannot guard them all...

> This works well, but the drawback is that a service has to write to two separate storage systems, a database and Kafka. We still need check-and-repair.

what about kafka connect ?

How does this compare to Kafka?

The first paragraph on that page covers that at a high level.

> Waltz is similar to existing log systems like Kafka in that it accepts/persists/propagates transaction data produced/consumed by many services. However, unlike other systems, Waltz provides a machinery that facilitates a serializable consistency in distributed applications. It detects conflicting transactions before they are committed to the log.

> serializable consistency in distributed applications.

Kakfa supports transactions not sure about serializable though.

Applications are open for YC Winter 2020

Guidelines | FAQ | Support | API | Security | Lists | Bookmarklet | Legal | Apply to YC | Contact