- Server delivers message M
- Client process event E entailed by message M
- Client tries to ack (A) message on server, but "packet loss"
- To make matters worse, let's say the client also immediately dies after this
How do you handle this situation?
The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...
I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.
Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.
EDIT: Recommended reading: "Life Beyond Distributed Transactions", Pat Helland - http://queue.acm.org/detail.cfm?id=3025012
We've been doing this in 2005 at +10k msgs/sec (1k payload), durable, transacted, fully encrypted, with no two phase commit, supporting long disconnects (I know for documented cases conversations that resumed and continued after +40 days of partner network disconnect).
Running into resource limits (basically out of disk space) is something the database community knows how to monitor, detect and prevent for decades now.
I really don't get why so many articles, blogs and comments claim this is not working or impossible or even hard. My team shipped this +12 years ago, is used by major deployments, technology is proven and little changed in the original protocol.
So everyone is talking about the same thing here, data can get delivered twice, and the application must handle it in an idempotent way. For TCP this happens to be looking at some counter and throwing away anything that's already been seen. With a single client/server connection maintaining sequence numbers like TCP does solve the problem but it's harder to use this same technique in multi-client, multi-server, "stateless" requests that are common in the web world.
EDIT: To clarify, what I mean by TCP "does deliver data more than once", is that one side of the connection can send the same data twice. It's true that it's then discarded by the other end but this is what people are talking about when they talk about the theoretical impossibility of never sending anything twice. The rest is basically the idem-potency thing of ensuring that data received twice somehow doesn't cause anything abnormal.
I've been living in this problem space for many years now and seen the wheel reinvented many times. Whenever the plumbing does not guarantees EOIO but the business demands it, it gets pushed into the app layer where TCP (retries and acks) is reimplemented, to various success levels.
Isn't this the same app layer stuff that has to get reimplemented? I can see how this is often pushed back to a human (oops, my money transfer didn't go through, I'll have to redo it) but it's still something that has to be dealt with somewhere.
Database programmers have the means to deal with it off-the-shelf: BEGIN TRANSACTION ... COMMIT. When your queues are in the database, this becomes trivial. Even without the system I'm talking about (Service Broker) that has the queues stored in the database, most regular messaging systems do support enrolling into a distributed transaction and achieve an atomic dequeue/process sequence, is just that many apps/deployments don't bother to do it because the ops overhead (XA coordinator), reduced throughput and/or simply not understanding the consequences.
Point is that durable, persisted, transacted 'sockets' are behaving very differently from a TCP socket. Is a whole lot harder to simply lose a message in the app layer when interacting with a database.
The transaction boundary defined in your consumer covers the interactions of that consumer with other XA aware nodes who all participate in a "distributed transaction". So you can process this message N times without committing, and thus possibly N times telling other systems to do M' side-effect of that message, but until you commit the world has not changed.
1. Receive from server, commit state = NEW, UNACK
2. Send ACK to server, get confirmation from server, commit state = NEW, ACK
3. Start processing, commit state = PROC
4. Got results, commit state = FIN, UNACK
5. Send FIN to server, commit state = FIN, ACK
Each commit is a database transaction where you write the results of that step along with the new state. If anything fails along the way the work-in-progress is discarded along with the state change. The server has an equivalent so if it gets a duplicate ACK for the same (or earlier) state it can ignore it.
In this example, if the client crashes between 1-2, in #2 never gets confirmation, or crashes trying to commit the "NEW, ACK" state then it will retry. The server has already committed the fact that it sent the value to the client and is awaiting an ACK. If it saw the ACK and gets a duplicate it ignores it. If it never saw the first ACK then it will see the second(+) attempt and commit that it saw the ACK before sending confirmation to the client.
It's true that this doesn't work if your processing touches external systems or otherwise escapes the transaction context, but in those cases you do still get at-least-once delivery (or at-most-once, if you choose to commit the receipt before processing the message).
It really is a powerful technology and when leveraged can absolutely reduce the level of effort and cognitive burden to building correct asynchronous systems.
Interesting! You're saying the exact opposite of what Tyler Treat is saying:
> Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.
What do you think of his argument?
In other words, to reliably agree on a system state (whether message id was delivered) you need the system to be Consistent. And per CAP theorem, it cannot be Available in presence of Partitions.
So other people you're referring to probably talk about distributed systems.
As I said, users had cases when the plumbing (messaging system) recovered and delivered messages after +40 days of network partitioning. Correctly written apps completed the business process associated with those messages as normal, no special case. Humans can identify and fix outages and databases can easily outlast network outages (everything is durable, transacted, with HA/DR). And many business processes make perfect sense to resume/continue after the outage, even if it lasted for days.
Approaching exactly once delivery asymptotically is possible. Your parent poster's point is that this is one where you can get so close to exactly once in order that in practice you never violate for years and years.
> Just ack after you commit locally.
wait ... what is this "retry" and "ack"? is this so that the sender knows not to send the message again? that sounds suspiciously familiar ...
I mean, come on, you're just describing a framework you wrote that handles "at least once delivery" plus "idempotent operations" for the programmer. That's fine. It's marketing to say it's "exactly once". You're just arguing over what to call it.
Idempotency is a restriction/requirement that you may put in place in a distributed system to make the above types of guarantees. Deduping of messages like the article mentions means that it is using "at least once" message delivery, with some notion of unique message IDs and/or messages to then dedup the message.
TCP is a great analogy, where the Windows of the protocol are effectively maintained by Kafka. Anyway, "exactly once" is very hard and requires putting limits into the system such that deduping or something similar is possible. But I'd agree that in all cases anything that claims "exactly once" behavior is in fact implementing that on top of protocols with "at least once" primitives.
This isn't true. There is a point where both sides have confirmed receipt of the last new info (the original confirmation back to the initial sender), and further confirmation is just icing. What kind of general is it that says "I've received ten acknowledgements so far from the other side, but I'm still not sure!"?
Remember that it's the message itself being confirmed, not 'did the last message get through'. There is a point where both sides are aware of both the message and a confirmed confirmation of that message.
Suppose A sends the message to B, B sends an ack to A but A's ack back to B is lost. Now both A and B know the time of the attack but B considers it possible that [A does not know that B knows]. B thinks if A really does not know that B knows then A may not attack out of the consideration of the possibility that B does not know. And this possibility may make B call off the attack even if B knows.
Each round of successful exchange increase the order of statements that are known, but there is a statement of higher order which is still not known and which can cause the attack to unravell inductively.
Look up 'Rubinstein's e-mail game' for a choice of payoffs that causes no attack to happen for any positive probability, howsoever small, of a message getting lost.
message 1 -> attack!
message 2 -> sure!
message 3 -> I got your sure! At this point, general A knows the message got through
message 4 -> I got your confirmation! At this point, general B knows that general A got the reply
message 5 -> I got your confirmation confirmation! At this point, general A is sure that both parties know when the attack is. General A knows that the confirmation got through, and Gen A is good to go.
message 6 -> conf^n! At this point, General B knows that the confirmation got through, and Gen B is good to go.
message 7 => icing
Even if message 6 doesn't get through, message 2 and 4 have, so general A knows that both generals know. Message 3 tells general B that both generals know - all that's missing is the nice-to-have confirmation.
You don't need to keep on confirming the last message ad infinitum, because that doesn't add new data about the original message and who knows it. If messenger 77 were to be captured by the city, it's not like the generals would call off the attack because they weren't sure of the timing info.
Perhaps another way of putting it: this isn't a continuous string of information that gets added to. It's a set of discrete parcels of information. The additional messages only add information about the veracity of the later parcels, not the initial ones.
A and B must not attack alone, i.e. A must not attack if B is not going to attack, and B must not attack if A is not going to attack.
Now, message 1 (from A to B) says "attack!". So B knows that A wants B to attack.
...but A mustn't attack alone, so A won't attack unless A receives B's confirmation. So B sends message 2 ("sure!").
...but B mustn't attack unless B knows that A is going to attack, and A will only attack if A has received message 2. So A has to send another message to confirm receipt of message 2.
...and so on.
None of these confirmations are simply "nice-to-have" - if you need to be 100% sure that you're both going to attack, you need to ensure that every confirmation you send has been received by your peer.
You have that confirmation of agreement in the third round of messages. Receipt of the fourth message proves that both sides have knowledge and have agreed, aware of the other's intent. That's the message you have to prove has arrived for surety. Once you've done enough messages to prove message #4, you're good to go.
TCP solves the problem by retransmission, similar to approaches illustrated in https://en.wikipedia.org/wiki/Two_Generals%27_Problem#Engine...
Note that it's impossible to solve it completely, and TCP is no different. If you unplug your router, there is no hope of getting a message through. But it's highly unlikely that there is no path between you and your destination (a "partition") so TCP retransmits until it finds one. This would be similar to generals sending messengers until one gets through and a confirmation messenger comes back.
That said it's still a difficult problem and I actually wish people would stop trying to roll their own schemes. For example, this scheme relies on examining a Kafka outbound topic to resolve in-doubt outbound messages. But what happens if the outbound message + commit is still "in flight" when the system recovers so the system retransmits the in-doubt outbound message and so rather than deduping it now is generating dupes? Yes, the chances of this are minimal which means it will happen.
1. Sender sends message.
2. If no acknowledgement from recipient is returned to sender, resend message until an acknowledgement is received from recipient.
3. If recipient receives message, check store to see if it's been received before.
4. If it's not in the store, store it, acknowledge receipt to sender, and process it.
5. If it's already in the recipient's store, acknowledge receipt to sender, and discard message.
TLDR, you don't need infinitely durable infinite memory. You just need (a) a single-level store in which every event is a transaction, (b) a message protocol with true end-to-end acks, and (c) a permanent session between every pair of nodes. We don't have single-level storage hardware (although XPoint comes close) but it's easy to simulate semantically.
Think about the problem intuitively. I want to send you a stream of messages, each of which you act on exactly once.
I do it like this: I put a sequence number in each message. I keep sending you message 1 until you send me an acknowledgment that you heard message 1. I can also send messages 2, 3, or whatever (lockstep isn't needed), but you process messages in order and ignore messages you've already heard. Never ack an ack, always ack a dup.
What does implementing this design require? It requires a persistent sequence number, on the sending endpoint, for every receiving endpoint in the world. (Edit: and of course another SN on the receiving end.) Of course, for every endpoint you haven't sent to yet, the number is 0 and doesn't need to be stored. This is not a terribly onerous amount of storage.
A sequence number is the piece of state generally known as a "session" in networking theory parlance. Of course a TCP connection is a session. We're simply saying that every two endpoints have a sort of implicit, persistent connection.
Moreover, every message must be a transaction. Acknowledging the message acknowledges that the application, as well as the sequence number, has been fully and persistently updated with the knowledge that the message contains. One way to think of this is that we're adding the latency of a transaction to persistent storage to our packet latency. SSDs are good here.
Furthermore, since in real life semantic messages need to be delivered in packet-sized fragments, a fragmentation model with end-to-end "piggybacked" acks is needed. There can't be a separate message-level ack (the "stack of acks" problem is the curse of the Internet stack) -- acknowledging all the fragment packets acknowledges the message.
All this and more is explained here (plug alert):
That's not EOM, that's just the sensible workarounds to use in light of the fact EOM doesn't exist. Obviously a lot of protocols and applications use such things since the inability to have EOM in the real world has not rendered our entire network edifice impossible or useless in real life.
If EOM means "the programmer doesn't have to think about idempotency," EOM is what I want. Happy to call this "EOM asterisk" if you and/or the OP like.
At any point in the conversation, you don't know whether your interlocutor has yet received the last message you sent. This is because you are talking over a network, rather than over a magic bus.
However, you know that before the endpoint processes each message, it has processed each previous message once and exactly once. I think this is what the programmer wants -- asterisk or no.
Network connectivity failure is best modeled as the limit case of network latency. Of course, when you send a message over a network in the real world, you can't know whether it has been received or not until you get an acknowledgment.
It is not impossible to handle a system that produces rare corner cases. It's just expensive and a pain in the butt.
1) do nothing, risking message loss
2) retransmit, risking duplication
But of course that's only from messaging system point of view. Deduplication at receiver end can help reduce problem, but itself can fail (there is no foolproof way of implementing that pseudocode's "has_seen(message.id)" method)
My go-to for explaining to people is the Two Generals Problem https://en.wikipedia.org/wiki/Two_Generals%27_Problem
it's an essentially-useless guarantee for any sort of Kafka consumer that interacts with an external system.
Exactly-Once is not something that can be provided-for or delegated-to an arbitrary, non-integrated system. For an "integrated" system, it requires idempotent behavior, in which case it's really At-Least-Once, so...
Meaning, you want "exactly once" and you don't want duplicates, yes. But you allow for reprocessing, provided that you have a way for deduplicating.
You want a guarantee that if the producer (at the top of your data processing pipeline) sends a message, then this message eventually corresponds to exactly 1 record in your final storage(s).
One easy-to-understand-yet-simplistic example is: send a message to kafka, use topic+partition+offset as primary key, store in a RDBMS. This is widely accepted as "exactly once", but clearly you may have multiple attempts to save the message into the db, which will fail due to the primary key integrity constrain.
Wait why? Just because you'd have to store the list of seen messages theoretically indefinitely?
What they've done is shift the point of failure from something less reliable (client's network) to something more reliable (their rocksdb approach) - reducing duplicates but not guaranteeing exactly once processing.
but if you can persist a monotonic sequence number, thats gets you pretty far. we use tcp all the time even though its has no magic answer to distributed consensus (and uses a super-weak checksum). 2pc doesnt guarantee progress and/or consistency either and its pretty effective.
If you send the message with a hash of the previous state on the server, (like with proof-of-work in Bitcoin), since it is so unlikely that it will hash will be the same with and without the message appended, it doesn't really matter if it is strictly nonzero, if it is just small enough.
Our network issues are nearly 6 times higher (~3.5%) due to GPRS, and we solved the duplication problem with an approach involving both client and server side.
Clients would always ensure that all the information sent by the server was successfully received. If something goes wrong, instead of retrying (sending the payment again), the client sends just the transaction UUID to the server, and the server might either respond with: A. the corresponding response for the transaction or B. not found.
In the scenario A, the POS terminal managed to properly send all the information to the server but failed to receive the response.
In the scenario B, the POS terminal didn't even manage to properly send the information to the server, so the POS can safely retry.
Enough of resume-driven-engineering, why does every need to reinvent the wheel?
 No disrespect meant - just a description of size. Source: running a piddly 2-person engineering team.
There is still a potential for problems in the message delivery to the endpoints (malformed messages, Kafka errors, messages not being consumed fast enough and lost), or duplication at that level (restart a listener on the Kafka stream with the wrong message ID) as well.
This is based on my own pains with Kinesis and Lambda (which, I know, isn't Kafka).
In my experience, better to just allow raw "at least once" messaging and perform idempotant actions based off the messages. It's not always possible (and harder when it is possible), but its tradeoffs mean you're less likely to lose messages.
- Talk from Kafka Summit: https://www.confluent.io/kafka-summit-nyc17/resource/#exactl...
- Proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
Unless the business value of data is derived after applying some summary statistics, than even sampling the data works, and you can lose events in an event stream, while not changing the insight gained. Originally Kafka was designed to be a high throughput data bus for analytical pipeline where losing messages was ok. More recently they are experimenting with exactly once delivery.
There is a lot of value derived from de-duping near ingress of a heavy stream such as this. You're saving downstream consumers time (money) and potential headaches. You may be in an industry where duplicates can be handled by a legacy system, but it takes 5-10 minutes of manual checks and corrections by support staff. That was my exact use case and I can't count the number of times we were thankful our de-duping handled "most" cases.
Over a window of time that changes depending on the amount of ingested events.
Basically, they read from a kafka stream and have a deduplication layer in rocks db that produces to another kafka stream. They process about 2.2 billion events through it per day.
While this will reduce duplicates and get closer to Exactly Once (helping reduce the two generals problem on incoming requests and potentially work inside their data center), they still have to face the same problem again when they push data out to their partners. Some packet loss, and they will be sending out duplicate to the partner.
Not to downplay what they have done as we are doing a similar thing near our exit nodes to do our best to prevent duplicate events making it out of our system.
"In the past three months we’ve built an entirely new de-duplication system to get as close as possible to exactly-once delivery"
What's annoying is that they do not get precise and formal about what they want out of their new model. Also, their numbers only speak to performance, not correctness.
On the plus side, I think it's awesome to see bloom filters successfully used in production. That sort of thing is easy to implement, but not easy to get right for every use case.
EOM is NOT strictly speaking equivalent to the Two Generals Problem, or Distributed Consensus, in an unreliable network. In distributed consensus, at some given point in time, A has to know X, A has to know B knows X, A has to know B knows A knows X, ... It has to do with the fact that the message broker is in some sense the arbitrator of truth, so the consumer(s) don't need full consensus. In an unreliable network, you can have EOM. http://ilpubs.stanford.edu:8090/483/1/2000-7.pdf gives some examples of how that works.
HOWEVER, you can't have EOM when the consumers can fail. If a consumer fails there's no way, in general, to tell if the last message it was working on was completed.
There are a couple of edge cases where you can still have EOM. For instance, a system where you have a message broker A, and a bunch of consumers that read messages x from that queue, compute f(x), and insert f(x) onto message broker B, where f(x) may be computed multiple times for the same x (i.e. if f is a pure function or you don't care about the side effects). This system can implement EOM in the presence of an unreliable network and consumer failures (I think it can handle one or both of the message brokers failing too, not 100% sure) in the sense that x will never be in broker A at the same time as f(x) is in broker B, f(x) will never be in broker B more than once for the same x, and any y in B had some x that was in A such that y = f(x).
I love it when that happens.
1) What happens if they lose their rocksdb with all of the messageIds?
2) Is their kafka atleast-once delivery? How do they guarantee that kafka doesn't reject their write? Also, assuming they have set up their kafka for at least once delivery, doesn't that make the output topic susceptible to duplicates due to retries, etc?
3) >Instead of searching a central database for whether we’ve seen a key amongst hundreds of billions of messages, we’re able to narrow our search space by orders of magnitude simply by routing to the right partition.
Is "orders of magnitude" really correct? Aren't you really just narrowing the search space by the number of partitions in kafka? I suppose if you have a hundred partitions, that would be 2 orders of magnitude, but it makes it sound like it's much more than that.
I'm wondering the same.
Perhaps they expect a 1:1 mapping of RocksDB, partition, and de-duplication worker.
> If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic.
Does this mean that "on worker crash" the worker replays the entire output topic and compare it to the rocksdb dataset?
Also, how do you handle scaling up or down the number of workers/partitions?
The scale is smaller (about 10k rpm), but the basic idea is the same (store a message ID in a key-value store after each successful send).
I like the idea of invalidating records by overall size, we hadn't thought of that. We just use a fixed 24-hour TTL.
Either way your listener(s) still has to have its own deduplication. Ensuring a message ends up on the queue only once, and ensuring it's processed exactly once, are two different problems that require separate handling (and, the former is what most out of the box systems claim to solve, while the latter is more important, and, frankly, completely negates the need of the former).
So we can get duplicate API submissions regardless of whether or not we enabled transactional productions into kafka from a producer.
IIRC, it's provably impossible in a distributed system where processes might fail, i.e. all real systems.
(Google Cloud emp speaking)
The problem with relying on TCP for reliability is that its state is in memory, associated with a particular peer IP address, and acknowledgements passed back to the sender only indicate that the receiver has the data in local memory, not that the data has been processed.
A file download over TCP can fail, for example due to a network problem. Ensuring reliable delivery requires additional measures outside of TCP, such as retrying the download using a new connection.
In practice, this means that TCP is primarily useful for providing flow control and offering a streaming interface (no worry about packet sizes). Less so as a complete solution for transmission reliability.
> In Python (aka pseudo-pseudocode)
This annoyed probably more than it should have.
segment.com refused to connect.
Checking the connection
Maybe something local to me only?
In here, uMatrix just blocks the site with the message:
> uMatrix has prevented the following page from loading:
I checked, and one of my uMatrix hosts files includes 'www.segment.com'.
No need to nuke a city to get a fly.
I mentioned elsewhere on Google Cloud the most elegant way of doing this is with Google Cloud Dataflow 
(work at G)