- Server delivers message M
- Client process event E entailed by message M
- Client tries to ack (A) message on server, but "packet loss"
- To make matters worse, let's say the client also immediately dies after this
How do you handle this situation?
The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...
I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.
Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.
EDIT: Recommended reading: "Life Beyond Distributed Transactions", Pat Helland - http://queue.acm.org/detail.cfm?id=3025012
We've been doing this in 2005 at +10k msgs/sec (1k payload), durable, transacted, fully encrypted, with no two phase commit, supporting long disconnects (I know for documented cases conversations that resumed and continued after +40 days of partner network disconnect).
Running into resource limits (basically out of disk space) is something the database community knows how to monitor, detect and prevent for decades now.
I really don't get why so many articles, blogs and comments claim this is not working or impossible or even hard. My team shipped this +12 years ago, is used by major deployments, technology is proven and little changed in the original protocol.
So everyone is talking about the same thing here, data can get delivered twice, and the application must handle it in an idempotent way. For TCP this happens to be looking at some counter and throwing away anything that's already been seen. With a single client/server connection maintaining sequence numbers like TCP does solve the problem but it's harder to use this same technique in multi-client, multi-server, "stateless" requests that are common in the web world.
EDIT: To clarify, what I mean by TCP "does deliver data more than once", is that one side of the connection can send the same data twice. It's true that it's then discarded by the other end but this is what people are talking about when they talk about the theoretical impossibility of never sending anything twice. The rest is basically the idem-potency thing of ensuring that data received twice somehow doesn't cause anything abnormal.
I've been living in this problem space for many years now and seen the wheel reinvented many times. Whenever the plumbing does not guarantees EOIO but the business demands it, it gets pushed into the app layer where TCP (retries and acks) is reimplemented, to various success levels.
Isn't this the same app layer stuff that has to get reimplemented? I can see how this is often pushed back to a human (oops, my money transfer didn't go through, I'll have to redo it) but it's still something that has to be dealt with somewhere.
Database programmers have the means to deal with it off-the-shelf: BEGIN TRANSACTION ... COMMIT. When your queues are in the database, this becomes trivial. Even without the system I'm talking about (Service Broker) that has the queues stored in the database, most regular messaging systems do support enrolling into a distributed transaction and achieve an atomic dequeue/process sequence, is just that many apps/deployments don't bother to do it because the ops overhead (XA coordinator), reduced throughput and/or simply not understanding the consequences.
Point is that durable, persisted, transacted 'sockets' are behaving very differently from a TCP socket. Is a whole lot harder to simply lose a message in the app layer when interacting with a database.
The transaction boundary defined in your consumer covers the interactions of that consumer with other XA aware nodes who all participate in a "distributed transaction". So you can process this message N times without committing, and thus possibly N times telling other systems to do M' side-effect of that message, but until you commit the world has not changed.
1. Receive from server, commit state = NEW, UNACK
2. Send ACK to server, get confirmation from server, commit state = NEW, ACK
3. Start processing, commit state = PROC
4. Got results, commit state = FIN, UNACK
5. Send FIN to server, commit state = FIN, ACK
Each commit is a database transaction where you write the results of that step along with the new state. If anything fails along the way the work-in-progress is discarded along with the state change. The server has an equivalent so if it gets a duplicate ACK for the same (or earlier) state it can ignore it.
In this example, if the client crashes between 1-2, in #2 never gets confirmation, or crashes trying to commit the "NEW, ACK" state then it will retry. The server has already committed the fact that it sent the value to the client and is awaiting an ACK. If it saw the ACK and gets a duplicate it ignores it. If it never saw the first ACK then it will see the second(+) attempt and commit that it saw the ACK before sending confirmation to the client.
It's true that this doesn't work if your processing touches external systems or otherwise escapes the transaction context, but in those cases you do still get at-least-once delivery (or at-most-once, if you choose to commit the receipt before processing the message).
It really is a powerful technology and when leveraged can absolutely reduce the level of effort and cognitive burden to building correct asynchronous systems.
Interesting! You're saying the exact opposite of what Tyler Treat is saying:
> Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.
What do you think of his argument?
In other words, to reliably agree on a system state (whether message id was delivered) you need the system to be Consistent. And per CAP theorem, it cannot be Available in presence of Partitions.
So other people you're referring to probably talk about distributed systems.
As I said, users had cases when the plumbing (messaging system) recovered and delivered messages after +40 days of network partitioning. Correctly written apps completed the business process associated with those messages as normal, no special case. Humans can identify and fix outages and databases can easily outlast network outages (everything is durable, transacted, with HA/DR). And many business processes make perfect sense to resume/continue after the outage, even if it lasted for days.
Approaching exactly once delivery asymptotically is possible. Your parent poster's point is that this is one where you can get so close to exactly once in order that in practice you never violate for years and years.
> Just ack after you commit locally.
wait ... what is this "retry" and "ack"? is this so that the sender knows not to send the message again? that sounds suspiciously familiar ...
I mean, come on, you're just describing a framework you wrote that handles "at least once delivery" plus "idempotent operations" for the programmer. That's fine. It's marketing to say it's "exactly once". You're just arguing over what to call it.
Idempotency is a restriction/requirement that you may put in place in a distributed system to make the above types of guarantees. Deduping of messages like the article mentions means that it is using "at least once" message delivery, with some notion of unique message IDs and/or messages to then dedup the message.
TCP is a great analogy, where the Windows of the protocol are effectively maintained by Kafka. Anyway, "exactly once" is very hard and requires putting limits into the system such that deduping or something similar is possible. But I'd agree that in all cases anything that claims "exactly once" behavior is in fact implementing that on top of protocols with "at least once" primitives.
This isn't true. There is a point where both sides have confirmed receipt of the last new info (the original confirmation back to the initial sender), and further confirmation is just icing. What kind of general is it that says "I've received ten acknowledgements so far from the other side, but I'm still not sure!"?
Remember that it's the message itself being confirmed, not 'did the last message get through'. There is a point where both sides are aware of both the message and a confirmed confirmation of that message.
Suppose A sends the message to B, B sends an ack to A but A's ack back to B is lost. Now both A and B know the time of the attack but B considers it possible that [A does not know that B knows]. B thinks if A really does not know that B knows then A may not attack out of the consideration of the possibility that B does not know. And this possibility may make B call off the attack even if B knows.
Each round of successful exchange increase the order of statements that are known, but there is a statement of higher order which is still not known and which can cause the attack to unravell inductively.
Look up 'Rubinstein's e-mail game' for a choice of payoffs that causes no attack to happen for any positive probability, howsoever small, of a message getting lost.
message 1 -> attack!
message 2 -> sure!
message 3 -> I got your sure! At this point, general A knows the message got through
message 4 -> I got your confirmation! At this point, general B knows that general A got the reply
message 5 -> I got your confirmation confirmation! At this point, general A is sure that both parties know when the attack is. General A knows that the confirmation got through, and Gen A is good to go.
message 6 -> conf^n! At this point, General B knows that the confirmation got through, and Gen B is good to go.
message 7 => icing
Even if message 6 doesn't get through, message 2 and 4 have, so general A knows that both generals know. Message 3 tells general B that both generals know - all that's missing is the nice-to-have confirmation.
You don't need to keep on confirming the last message ad infinitum, because that doesn't add new data about the original message and who knows it. If messenger 77 were to be captured by the city, it's not like the generals would call off the attack because they weren't sure of the timing info.
Perhaps another way of putting it: this isn't a continuous string of information that gets added to. It's a set of discrete parcels of information. The additional messages only add information about the veracity of the later parcels, not the initial ones.
A and B must not attack alone, i.e. A must not attack if B is not going to attack, and B must not attack if A is not going to attack.
Now, message 1 (from A to B) says "attack!". So B knows that A wants B to attack.
...but A mustn't attack alone, so A won't attack unless A receives B's confirmation. So B sends message 2 ("sure!").
...but B mustn't attack unless B knows that A is going to attack, and A will only attack if A has received message 2. So A has to send another message to confirm receipt of message 2.
...and so on.
None of these confirmations are simply "nice-to-have" - if you need to be 100% sure that you're both going to attack, you need to ensure that every confirmation you send has been received by your peer.
You have that confirmation of agreement in the third round of messages. Receipt of the fourth message proves that both sides have knowledge and have agreed, aware of the other's intent. That's the message you have to prove has arrived for surety. Once you've done enough messages to prove message #4, you're good to go.
TCP solves the problem by retransmission, similar to approaches illustrated in https://en.wikipedia.org/wiki/Two_Generals%27_Problem#Engine...
Note that it's impossible to solve it completely, and TCP is no different. If you unplug your router, there is no hope of getting a message through. But it's highly unlikely that there is no path between you and your destination (a "partition") so TCP retransmits until it finds one. This would be similar to generals sending messengers until one gets through and a confirmation messenger comes back.
That said it's still a difficult problem and I actually wish people would stop trying to roll their own schemes. For example, this scheme relies on examining a Kafka outbound topic to resolve in-doubt outbound messages. But what happens if the outbound message + commit is still "in flight" when the system recovers so the system retransmits the in-doubt outbound message and so rather than deduping it now is generating dupes? Yes, the chances of this are minimal which means it will happen.
1. Sender sends message.
2. If no acknowledgement from recipient is returned to sender, resend message until an acknowledgement is received from recipient.
3. If recipient receives message, check store to see if it's been received before.
4. If it's not in the store, store it, acknowledge receipt to sender, and process it.
5. If it's already in the recipient's store, acknowledge receipt to sender, and discard message.
TLDR, you don't need infinitely durable infinite memory. You just need (a) a single-level store in which every event is a transaction, (b) a message protocol with true end-to-end acks, and (c) a permanent session between every pair of nodes. We don't have single-level storage hardware (although XPoint comes close) but it's easy to simulate semantically.
Think about the problem intuitively. I want to send you a stream of messages, each of which you act on exactly once.
I do it like this: I put a sequence number in each message. I keep sending you message 1 until you send me an acknowledgment that you heard message 1. I can also send messages 2, 3, or whatever (lockstep isn't needed), but you process messages in order and ignore messages you've already heard. Never ack an ack, always ack a dup.
What does implementing this design require? It requires a persistent sequence number, on the sending endpoint, for every receiving endpoint in the world. (Edit: and of course another SN on the receiving end.) Of course, for every endpoint you haven't sent to yet, the number is 0 and doesn't need to be stored. This is not a terribly onerous amount of storage.
A sequence number is the piece of state generally known as a "session" in networking theory parlance. Of course a TCP connection is a session. We're simply saying that every two endpoints have a sort of implicit, persistent connection.
Moreover, every message must be a transaction. Acknowledging the message acknowledges that the application, as well as the sequence number, has been fully and persistently updated with the knowledge that the message contains. One way to think of this is that we're adding the latency of a transaction to persistent storage to our packet latency. SSDs are good here.
Furthermore, since in real life semantic messages need to be delivered in packet-sized fragments, a fragmentation model with end-to-end "piggybacked" acks is needed. There can't be a separate message-level ack (the "stack of acks" problem is the curse of the Internet stack) -- acknowledging all the fragment packets acknowledges the message.
All this and more is explained here (plug alert):
That's not EOM, that's just the sensible workarounds to use in light of the fact EOM doesn't exist. Obviously a lot of protocols and applications use such things since the inability to have EOM in the real world has not rendered our entire network edifice impossible or useless in real life.
If EOM means "the programmer doesn't have to think about idempotency," EOM is what I want. Happy to call this "EOM asterisk" if you and/or the OP like.
At any point in the conversation, you don't know whether your interlocutor has yet received the last message you sent. This is because you are talking over a network, rather than over a magic bus.
However, you know that before the endpoint processes each message, it has processed each previous message once and exactly once. I think this is what the programmer wants -- asterisk or no.
Network connectivity failure is best modeled as the limit case of network latency. Of course, when you send a message over a network in the real world, you can't know whether it has been received or not until you get an acknowledgment.
It is not impossible to handle a system that produces rare corner cases. It's just expensive and a pain in the butt.