I have one question regarding the determination of the latest version of a set of peer data when overlapping transactions occurred.
Suppose initially s1/x=0 at server s1 and s2/y=0 at server s2. Client1 updates s1/x=10 and s2/y=10 at transaction v1, and at the same time client2 updates s1/x=20 and s2/y=20 at v2. Suppose the clients contact the servers in different order and the update messages arrive at the servers in reverse order, such that s1's pending queue is [x.v1=10, x.v2=20] and s2's pending queue is [y.v1=20, y.v2=10].
After client1 and client2 send the make-good command, s1's good queue is [x.v1=10, x.v2=20] and s2's good queue is [y.v2=20, y.v1=10]. When a third client3 tries to read the latest value of x or y, what is the latest value of its peer data? It looks like depending which data client3 starts with, it would get a different version of the peer data? Like starts with s1/x got s1/x.v2 == 20 and s2/y.v2 == 20, while starts with s2/y.v1 == 10 and s1/x.v1 == 10.
Am I missing something or this is the semantic in determining the latest values of peer data?
> I guess one of the key insights is that each data has a canonical server owner which enforces the consistency of the writes of the data at a single place.
Well, this is the way I presented it, because it's easiest to understand. But, if you want a replicated system that provides HA, there are alternatives (http://www.bailis.org/blog/non-blocking-transactional-atomic...).
> When a third client3 tries to read the latest value of x or y, what is the latest value of its peer data? It looks like depending which data client3 starts with, it would get a different version of the peer data? ... Am I missing something or this is the semantic in determining the latest values of peer data?
Good question! This ultimately comes down to how you want to handle concurrent writes. Many distributed databases use a "last writer wins" strategy when reconciling concurrent updates (that is, the correct behavior is specified to be that the database serves the highest-timestamped version of a given data item). Now, in your example, the clients both started their writes at the same (real-world) time and used this time as a basis for their timestamp, so the "last" write is undefined. We need a way to break the "last writer wins" tie. In practice, this can be something like the client ID appended to the last few bits of the timestamp or even a hash of the value written--as long as the tie-breaker is deterministic (that is, different replicas don't decide different "winners"), it doesn't really matter which is chosen.
In practice, to avoid storing every value ever written, you'd want to provide some kind of "merge" function for multiple writes, and, in our implementation and in often practice, this is last writer wins (plus some deterministic tie-breaker for identically timestamped but distinct writes).
The only problem that bogs me a bit in this algorithm is that transaction time sequence is not guarantied and preserved. Some clients may see x=y=10 and others x=y=20 during the transaction period. This happen if one client starts writing x=y=10 with s1 and the other y=x=20 with s2. Clients requesting x and y starting with s1 will get x=y=10 and those requesting x and y starting with s2 will get x=y=20. When the transactions completes, the values finally stored in x and y as good may be 10 or 20.
So the algorithm ensures consistency, which is the most important and useful property, but with many concurrent write transactions, the ending DB content may be a bit somehow unpredictable.
This can be a problem when one needs to do operations like x-=1 and y-=1 on the database as for seats reservation in a plane for a travel with two flights for instance. How would this be done ?
In the example configuration in the post (i.e., two servers with no replication), during the period between the write start and end of phase two of writes, reads can return either value written (x=y=0 or x=y=1). If we want to enforce the property that once one read returns the second write, all subsequent reads (that begin after this read) will return the second write (or a later write), which is called linearizability (http://cs.brown.edu/~mph/HerlihyW90/p463-herlihy.pdf), then whenever servers serve from 'pending', they should move the writes to 'good'. This is safe because, if a client reads from 'pending', it means that the write must be in 'good' elsewhere and therefore is stable.
As for how to order the writes, real-time clocks can provide a fairly useful timestamp mechanism. Databases like Cassandra use this real-time ordering for timestamping, which appears to work well in practice. Alternatively, you could use a distributed sequence number generator to totally order transactions. But real-time should be fine.
As you point out, this atomicity property doesn't address (all) application-level integrity constraints; it doesn't handle isolation between transactions. As I discussed in the post and in another comment (https://news.ycombinator.com/item?id=5784030), if your application-level integrity constraints are such that your updates are not commutative (that is, concurrent updates should not be allowed), then you'll need to block in order to guarantee database integrity is not violated. This is separate from atomicity, but it is important to remember. In your example above, two writers might both simultaneously reserve the last seat on a plane unless they synchronize.
Effectively, with non-commutative updates, you need greater isolation than is provided by the algorithm in the post (which effectively provides Read Committed isolation). Achieving greater isolation is possible but you'll lose the non-blocking property (again, due to the requirement to avoid concurrent updates via higher isolation like serializability rather than due to atomicity). But, for many applications like 2i and the multi-puts I mentioned at Facebook and Twitter, updates are commutative.
The paper describes this scheme as READ Committed which doesn't make generally make sense except in the context of a database with secondary indexes.
In the initial example, I represented 'good' as a set for ease of understanding, but, in practice, unless a client specifically requests an older version of a data item, the system serves the latest value written to 'good'. That is, the system does not expose a read() that returns multiple values. Rather, clients can read_good(key) or read_by_version(key, timestamp), both of which return a single version/write.
This is different from deciding which "transaction you will accept as valid and which you will reject." Many database systems perform in-place updates, but they must either either 1.) choose a winner across multiple writes (as I described below, distributed databases often employ what's called "last writer wins") or 2.) abort multiple writes. However, a large class of database systems (e.g., Oracle, Postgres) employ what's called multi-version concurrency control, whereby the database stores multiple versions of each data item. The system has a total commit order on transactions which determines what version a transaction should read() from the database. But, say, in Oracle, if:
1.) I start a transaction
2.) You start a transaction
3.) You modify variable X
4.) You commit
5.) I read X
Under what's known as Snapshot Isolation, I will read X as of the start of my transaction (i.e., I will not read your write to X even though it's "present" in the database). This is often accomplished via MVCC techniques.
> The paper describes this scheme as READ Committed which doesn't make generally make sense except in the context of a database with secondary indexes.
I tend to disagree. This is probably another conversation, but databases rarely guarantee serializable isolation (see http://www.bailis.org/blog/when-is-acid-acid-rarely/#acidtab...), and Read Committed is a fairly commonly deployed model. It's true that serializability is often required for correct operation. But, perhaps interestingly, many databases like Oracle 11g and SAP HANA do not provide it as an option (largely due to poor performance and deadlock avoidance), and, anecdotally, models like Read Committed are 2-3x faster than serializability.
I'm not entirely sure what you mean by applicability to secondary indexing (rather, I think there are other use cases, though I'm excited about 2i applications). However, I'm genuinely curious if I'm missing something.
The paper admits the algorithm does not guarantee termination but I would have liked to see more details on the failure scenarios regardless (minor details in footnote 3). It's not clear to me what writers see (if anything) when a write fails.
The paper does talk about how non-overlapping transactions won't block each other (which is nice but not a solution) and how one could add the ability to abort and trigger a cleanup by the use of a good (user supplied) failure detection module. But having a reliable node failure monitor that can react fast enough to ensure availability is really the hard part.
Would love to see more on aborting transactions next.
Yep, I left this out to avoid confusion at first. There are some details in the "What just happened?", but the basic idea is that any aborted write will be stuck in "pending." Same for failed writes; writers won't see these. The algorithm presented actually guarantees "Read Committed" ACID isolation.
> But having a reliable node failure monitor that can react fast enough to ensure availability is really the hard part.
Well, you'll remain available for reads and writes, but the size of "pending" might grow. You essentially need asynchronous distributed garbage collection, which will stall in the presence of partitions and may require the failure detectors I mentioned.
> The paper does talk about how non-overlapping transactions won't block each other (which is nice but not a solution)
I don't see how this isn't a solution for transactions that desire last-writer-wins semantics. If, as in the examples I listed, writes commute, then a blocked write shouldn't stall others. If you want to prevent Lost Update or Write Skew anomalies (i.e., concurrent update), then you'll have to give up availability and/or block.
Doesn't writes that commute mean that there was no contention to begin with? Ideally, if I have a balance of $100 in my account and try to spend $60 in two different transactions, one should come back as failed before the purchases are complete.
> If you want to prevent Lost Update or Write Skew anomalies (i.e., concurrent update), then you'll have to give up availability and/or block.
There is a difference between giving up read and write availability. My ideal database should be read-available at all times, but guarantee that writes are atomic and durable (and give up availability for this guarantee).
On the whole, this looks pretty neat. I like the idea of the client being responsible for the writes being committed on the servers. The client is then free to choose how to implement the IO, but ultimately, if a single client experiences a failure and a single write doesn't go through, it is usually a better outcome than a write going through and then replication between two servers breaking.
What are your thoughts on quorum-based voting in distributed systems? E.g.: your protocol but with the requirement that a write is considered stable if only most (vs all) of the servers involved have it marked as "good".
> Doesn't writes that commute mean that there was no contention to begin with? Ideally, if I have a balance of $100 in my account and try to spend $60 in two different transactions, one should come back as failed before the purchases are complete.
Whether or not conflicting writes are a problem or not depends on the application semantics. For example, if I'm, just adding items to a set, then my updates commute. But if I have a constraint that says that elements in the set need to be unique, then my updates don't (logically) commute any more. Ultimately, this is application-specific. The "CALM Principle" (http://www.bloom-lang.net/calm/, http://vimeo.com/album/2258285/video/53904989) captures this notion of "logical monotonicity" resulting in safe operation despite concurrency. Most of the applications I mentioned, like 2i updates and the social graph example, commute.
For non-commutative operations (and there are plenty), you'll need a stronger model like serializability or sequential consistency that necessarily blocks to prevent concurrent update (or otherwise aborts concurrent updates).
> My ideal database should be read-available at all times, but guarantee that writes are atomic and durable (and give up availability for this guarantee).
This is definitely one point in the spectrum; the question is whether you want to give up availability and write performance. But if you look at workloads like those in the Spanner paper, this is reasonable for many applications.
> What are your thoughts on quorum-based voting in distributed systems? E.g.: your protocol but with the requirement that a write is considered stable if only most (vs all) of the servers involved have it marked as "good".
There's a difference between quorums over replicas over the same data item and quorums over different data items. Using quorums over replicas would help ensure that the replicas provided properties like linearizability or register semantics like Dynamo or other systems that provide an option for "strong consistency." But it's not clear to me that quorums over replicas for different data items would provide the same atomicity property--does that make sense?
I intentionally left many of the issues of replication to the footnotes in the post--mostly for readability and clarity--but I believe the technique is applicable to both linearizable/"CP" and "eventually consistent"/"HA"/"AP" systems.
The problem to me seems to be that the interesting problems always come across the case where updates are not commutative. For example, real-time sensor reading where frequent updates completely override previous state are very easy to get right if you don't need the latest data at all times. You simply get recent or slightly stale data, and the service stops responding if all sensor data is old. There are many solutions to this case, and some are less complex than your solution.
However, when dealing with a read-update-write type transaction where the values you write depend on the values you read will indeed require stronger guarantees. Here is where a lot of systems get into trouble. They seem to either implement the fuck it mode or attempt to do some kind of distributed locking which usually takes a huge performance hit even if the network is fine.
> This is definitely one point in the spectrum; the question is whether you want to give up availability and write performance. But if you look at workloads like those in the Spanner paper, this is reasonable for many applications.
Yes, the idea is that the system becomes read-only but the data remains consistent and online. In most workloads that I've seen this is the desired behavior. For some reason I haven't seen this behavior implemented yet, though it's possible I just haven't looked at the right data store.
> There's a difference between quorums over replicas over the same data item and quorums over different data items. Using quorums over replicas would help ensure that the replicas provided properties like linearizability or register semantics like Dynamo or other systems that provide an option for "strong consistency." But it's not clear to me that quorums over replicas for different data items would provide the same atomicity property--does that make sense?
Unfortunately, you lost me here. I am thinking of the quorum over each register as being the pass-fail for whether a transaction can go on. Basically, client A connects to servers X and Y and says "set t = 1; set s = 2; ... commit;". During the ..., client B connects to server Y and Z and says "set u = 9; set t = 3;". Here, client B should fail since client A has not committed. This is determined by the fact that the majority of the servers (Y and Z) cannot agree that t is available for writing. In this case, client B will receive a "success" from Z and a "fail" from Y, which will prompt it to roll back the transaction and start over.
In other words, move the responsibility to coordinate a successful write from the datastore nodes to the single point: the client.
> The problem to me seems to be that the interesting problems always come across the case where updates are not commutative.
I agree that many application-level integrity constraints can't be satisfied with commutative updates. However, I've been surprised how frequently they can work for many web applications and how frequently "fuck-it" integrity constraint maintenance is employed given i.) faster database operation and 2.) asynchronous compensation mechanisms (e.g., bank overdraw fees) in the event of constraint violation. But your point is well-taken!
> Here, client B should fail since client A has not committed.
I think I understand your example, though I'm not clear as to whether s and t are stored on both X and Y or separately as s on X and t on Y. But, in general, writes in 'pending' should not block the insertion of other writes into 'pending' (in 2PC parlance, prepared but non-committed transactions should not block other transactions from preparing). This is fundamental to the non-blocking property of the algorithm here (i.e., http://www.bailis.org/blog/non-blocking-transactional-atomic...). So if client A hasn't committed, it shouldn't stop client B from committing. If client A and client B's writes don't commute, then the clients should use a stronger protocol/isolation level than the effective Read Committed that the NBTA algorithm here provides (doable but requires blocking). Does that make sense?
I suppose this method is somewhat similar in philosophy to Software Transactional Memory, but is distributed. The hard part here seems to be things like replication and garbage collection. What if client A fails before it commits? How long should the cluster lock the values of s and t before client B is able to modify them? I suppose some type of heartbeat from client to server would be good. Since there would be a strong guarantee that if a transaction fails, just retry it and nothing bad happens, then client A can come back online and try again once connectivity is restored.
* x = 0 and y = 0, starting out in the "good" state with metadata cleared;
* client A reads x with no metadata, then for whatever reason blocks or is delayed;
* client B writes x = 1 and y = 1, completes the transaction, and the metadata is cleared;
* client A reads y = 1.
This shouldn't be an issue unless you aggressively clear metadata and have a long-running client.
Having a heartbeat between the client and the server could also help prevent the write from getting purged from "pending" prematurely.
However, Datomic's design is based on a single, centralized transactor that pushes out all the new transaction information to an index that is distributed to all its clients (peers), whereas NBTA would enable distributed transactors.
See "The Datomic Data Model"
From Datomic's FAQ http://www.datomic.com/faq.html ...
How does Datomic provide ACID guarantees?
The transactor serializes all transactions, and each transaction runs against a stable view of the database, and succeed or fail in their entirety. Transactions are not acknowledged until after they are logged in storage. Atomic read/modify/write operations can be performed by database functions, which run on the transactor within the transaction. Note that Datomic can provide ACID guarantees without utilizing read-transactions, nor read locks, due to the presentation to the query engine(s) of the database as an immutable value.
Rich Hickey on Datomic's design...
Intro to Datomic (12 min)
The Design of Datomic (60 min)
The Datomic Architecture and Data Model (46 min)
Does the data remain stable, or must some additional work be performed to correct the inconsistent state?
If you want client writes that reached all servers to become visible, then the servers will have to perform the move from 'pending' to 'good' on their own (by communicating asynchronously). The notification of write stability is idempotent, so it doesn't hurt if both clients and servers perform this notification.
FWIW, in our implementation, servers perform the second step instead of clients (which can be made more efficient via batching).