Hacker News new | past | comments | ask | show | jobs | submit login
Delivering Billions of Messages Exactly Once (segment.com)
492 points by fouadmatin on June 29, 2017 | hide | past | web | favorite | 133 comments

I don't want to ever see the phrase "Exactly Once" without several asterisks behind it. It might be exactly once from an "overall" point of view, but the client effectively needs infinitely durable infinite memory to perform the "distributed transaction" of acting on the message and responding to the server.


- Server delivers message M

- Client process event E entailed by message M

- Client tries to ack (A) message on server, but "packet loss"

- To make matters worse, let's say the client also immediately dies after this

How do you handle this situation? The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...

I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.

Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.

EDIT: Recommended reading: "Life Beyond Distributed Transactions", Pat Helland - http://queue.acm.org/detail.cfm?id=3025012

Having spent 7 years of my life working with Pat Helland in implementing Exactly Once In Order messaging with SQL Server Service Broker[0] I can assure you that practical EOIO messaging is possible, exists, and works as advertised. Delivering data EOIO is not rocket science, TCP has been doing it for decades. Extending the TCP paradigms (basically retries and acks) to messaging is not hard if you buy into transacted persisted storage (= a database) for keeping undelivered messages (transmission queue) and storing received messages before application consumption (destination queue). Just ack after you commit locally.

We've been doing this in 2005 at +10k msgs/sec (1k payload), durable, transacted, fully encrypted, with no two phase commit, supporting long disconnects (I know for documented cases conversations that resumed and continued after +40 days of partner network disconnect).

Running into resource limits (basically out of disk space) is something the database community knows how to monitor, detect and prevent for decades now.

I really don't get why so many articles, blogs and comments claim this is not working or impossible or even hard. My team shipped this +12 years ago, is used by major deployments, technology is proven and little changed in the original protocol.

[0] https://docs.microsoft.com/en-us/sql/database-engine/configu...

TCP does deliver data more than once though. Sure within a TCP session you are guaranteed not to get the same byte twice out of your socket, bytes are delivered exactly once, in order. Now if your application that uses TCP pulls data out of the socket and then dies the data will need to be delivered again and the TCP protocol is unable to help us there, it's application level logic at that point.

So everyone is talking about the same thing here, data can get delivered twice, and the application must handle it in an idempotent way. For TCP this happens to be looking at some counter and throwing away anything that's already been seen. With a single client/server connection maintaining sequence numbers like TCP does solve the problem but it's harder to use this same technique in multi-client, multi-server, "stateless" requests that are common in the web world.

EDIT: To clarify, what I mean by TCP "does deliver data more than once", is that one side of the connection can send the same data twice. It's true that it's then discarded by the other end but this is what people are talking about when they talk about the theoretical impossibility of never sending anything twice. The rest is basically the idem-potency thing of ensuring that data received twice somehow doesn't cause anything abnormal.

Not when your 'socket' is a persisted, durable, transacted medium (ie. a database). Sure, applications can 'pull data out of the socket and then die', but this is a common scenarios on databases which is handled with transaction and post-crash recovery. The application comes back after the crash and find the same state as before the crash (the 'socket' still has the data ready to pull off), it pull again, process, and then commit. This is not duplicate delivery, since we're talking about an aborted and rolled back attempt, followed later by a successful processing. Again, databases and database apps have been dealing with this kind of problems for decades and know how handle them.

I've been living in this problem space for many years now and seen the wheel reinvented many times. Whenever the plumbing does not guarantees EOIO but the business demands it, it gets pushed into the app layer where TCP (retries and acks) is reimplemented, to various success levels.

> The application comes back after the crash and find the same state as before the crash (the 'socket' still has the data ready to pull off), it pull again, process, and then commit.

Isn't this the same app layer stuff that has to get reimplemented? I can see how this is often pushed back to a human (oops, my money transfer didn't go through, I'll have to redo it) but it's still something that has to be dealt with somewhere.

> it's still something that has to be dealt with somewhere

Database programmers have the means to deal with it off-the-shelf: BEGIN TRANSACTION ... COMMIT. When your queues are in the database, this becomes trivial. Even without the system I'm talking about (Service Broker) that has the queues stored in the database, most regular messaging systems do support enrolling into a distributed transaction and achieve an atomic dequeue/process sequence, is just that many apps/deployments don't bother to do it because the ops overhead (XA coordinator), reduced throughput and/or simply not understanding the consequences.

Point is that durable, persisted, transacted 'sockets' are behaving very differently from a TCP socket. Is a whole lot harder to simply lose a message in the app layer when interacting with a database.

But the application still has to deal with processing the same message twice if it dies before ACK.

He did say XA.

The transaction boundary defined in your consumer covers the interactions of that consumer with other XA aware nodes who all participate in a "distributed transaction". So you can process this message N times without committing, and thus possibly N times telling other systems to do M' side-effect of that message, but until you commit the world has not changed.


The ACK is also a transaction. Something like this:

1. Receive from server, commit state = NEW, UNACK 2. Send ACK to server, get confirmation from server, commit state = NEW, ACK 3. Start processing, commit state = PROC 4. Got results, commit state = FIN, UNACK 5. Send FIN to server, commit state = FIN, ACK

Each commit is a database transaction where you write the results of that step along with the new state. If anything fails along the way the work-in-progress is discarded along with the state change. The server has an equivalent so if it gets a duplicate ACK for the same (or earlier) state it can ignore it.

In this example, if the client crashes between 1-2, in #2 never gets confirmation, or crashes trying to commit the "NEW, ACK" state then it will retry. The server has already committed the fact that it sent the value to the client and is awaiting an ACK. If it saw the ACK and gets a duplicate it ignores it. If it never saw the first ACK then it will see the second(+) attempt and commit that it saw the ACK before sending confirmation to the client.

Right. The idea is that by having your database, message queue, and application all share the same transactional context, "reprocessing" the message twice doesn't matter, because the effects are only ever committed exactly once.

It's true that this doesn't work if your processing touches external systems or otherwise escapes the transaction context, but in those cases you do still get at-least-once delivery (or at-most-once, if you choose to commit the receipt before processing the message).

It really is a powerful technology and when leveraged can absolutely reduce the level of effort and cognitive burden to building correct asynchronous systems.

> Whenever the plumbing does not guarantees EOIO but the business demands it, it gets pushed into the app layer where TCP (retries and acks) is reimplemented, to various success levels.

Interesting! You're saying the exact opposite of what Tyler Treat is saying:

> Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.

(source: http://bravenewgeek.com/smart-endpoints-dumb-pipes/)

What do you think of his argument?

Right. If your definition of "delivered" is "showed a message to the user", then exactly once is a fairly trivial goal to attain. If it's defined "data over the wire", then it's nearly impossible to avoid some form of ACK/RETRY mechanism on real networks, which means that messages will need to sometimes have to be 'delivered' many times to ensure a single display to the user.

Are you talking about distributed environment, where network partitions can occur? If yes, then there's Two Generals Problem and "FLP result", that just prove it impossible. So I guess you're talking about non-distributed environment.

In other words, to reliably agree on a system state (whether message id was delivered) you need the system to be Consistent. And per CAP theorem, it cannot be Available in presence of Partitions.

So other people you're referring to probably talk about distributed systems.

Yes, I'm talking about distributed systems and I am aware of the CAP theorem. Hence my choice of the word 'practical'.

As I said, users had cases when the plumbing (messaging system) recovered and delivered messages after +40 days of network partitioning. Correctly written apps completed the business process associated with those messages as normal, no special case. Humans can identify and fix outages and databases can easily outlast network outages (everything is durable, transacted, with HA/DR). And many business processes make perfect sense to resume/continue after the outage, even if it lasted for days.

I'm not really versed in this topic, but it seems like using a database for a socket makes the system entirely centralized around that database. Is there something I'm missing?

ServiceBroker, at least, had the capability of (transactionally) sending messages between databases. So, if you drank the kool-aid (I did; it wasn't so bad), there needn't be "the centralized database". You can separate your databases and decompose your services, and indeed it's easier to do so correctly and with confidence because the technology eliminates a lot of hairy edge cases.

Pat had some opinions about CAP and SOA and distributed systems, see [0]. I also remember a talk given by Pat and Eric Brewer together, that went deeper into the whole CAP ideas vis-a-vis the model Pat had been advocating (see Fiefdoms and Emissaries [1]), but I can't remember when it was or find a link for it.

[0] https://blogs.msdn.microsoft.com/pathelland/2007/05/20/soa-a...

[1] http://download.microsoft.com/documents/uk/msdn/architecture...

Exactly once delivery is theoretically impossible.

Approaching exactly once delivery asymptotically is possible. Your parent poster's point is that this is one where you can get so close to exactly once in order that in practice you never violate for years and years.

My point is that I've seen people making decisions to go with 'best effort delivery' and live with the (costly) consequences because they read here and there that EOIO is impossible, so why bother trying.

Because idempotentcy can be cheap at the application layer, so why try to solve something we know can never truly be solved?

> Extending the TCP paradigms (basically retries and acks) to messaging is not hard

> Just ack after you commit locally.

wait ... what is this "retry" and "ack"? is this so that the sender knows not to send the message again? that sounds suspiciously familiar ...

I mean, come on, you're just describing a framework you wrote that handles "at least once delivery" plus "idempotent operations" for the programmer. That's fine. It's marketing to say it's "exactly once". You're just arguing over what to call it.

"At least once", "at most once" and "exactly once" are fairly well established terms in this area.

Idempotency is a restriction/requirement that you may put in place in a distributed system to make the above types of guarantees. Deduping of messages like the article mentions means that it is using "at least once" message delivery, with some notion of unique message IDs and/or messages to then dedup the message.

TCP is a great analogy, where the Windows of the protocol are effectively maintained by Kafka. Anyway, "exactly once" is very hard and requires putting limits into the system such that deduping or something similar is possible. But I'd agree that in all cases anything that claims "exactly once" behavior is in fact implementing that on top of protocols with "at least once" primitives.

The point of throwing a message broker at a problem is not to get away from those things; the point is to package non-idempotent operations up into idempotent containers "on the wire", so that the at-least-once semantics of the message being brokered translate to exactly-once semantics for the message-body delivered to the consumer.

Basically the Two General's Problem, eh?


I would say this problem is quite different than Two Generals, because client doesn't need to know that acknowledgment has been received, i.e., a solution doesn't require the receipt of message to be common knowledge. It it sufficient that server stops trying to send the message once it has received the ACK.

> Thus it quickly becomes evident that no matter how many rounds of confirmation are made, there is no way to guarantee the second requirement that each general be sure the other has agreed to the attack plan.

This isn't true. There is a point where both sides have confirmed receipt of the last new info (the original confirmation back to the initial sender), and further confirmation is just icing. What kind of general is it that says "I've received ten acknowledgements so far from the other side, but I'm still not sure!"?

Remember that it's the message itself being confirmed, not 'did the last message get through'. There is a point where both sides are aware of both the message and a confirmed confirmation of that message.

It is the acknowledgement of the receipt of the last message that is important. Wikipedia is not entirely clear on this one. The source of the problem are higher-order statements: "A knows that B knows that A knows ...".

Suppose A sends the message to B, B sends an ack to A but A's ack back to B is lost. Now both A and B know the time of the attack but B considers it possible that [A does not know that B knows]. B thinks if A really does not know that B knows then A may not attack out of the consideration of the possibility that B does not know. And this possibility may make B call off the attack even if B knows.

Each round of successful exchange increase the order of statements that are known, but there is a statement of higher order which is still not known and which can cause the attack to unravell inductively.

Look up 'Rubinstein's e-mail game' for a choice of payoffs that causes no attack to happen for any positive probability, howsoever small, of a message getting lost.

There comes a point in the chain where both sides have got the initial message, and confirmation that the other side has confirmed the message.

message 1 -> attack!

message 2 -> sure!

message 3 -> I got your sure! At this point, general A knows the message got through

message 4 -> I got your confirmation! At this point, general B knows that general A got the reply

message 5 -> I got your confirmation confirmation! At this point, general A is sure that both parties know when the attack is. General A knows that the confirmation got through, and Gen A is good to go.

message 6 -> conf^n! At this point, General B knows that the confirmation got through, and Gen B is good to go.

message 7 => icing

Even if message 6 doesn't get through, message 2 and 4 have, so general A knows that both generals know. Message 3 tells general B that both generals know - all that's missing is the nice-to-have confirmation.

You don't need to keep on confirming the last message ad infinitum, because that doesn't add new data about the original message and who knows it. If messenger 77 were to be captured by the city, it's not like the generals would call off the attack because they weren't sure of the timing info.

Perhaps another way of putting it: this isn't a continuous string of information that gets added to. It's a set of discrete parcels of information. The additional messages only add information about the veracity of the later parcels, not the initial ones.

The point is that it's not just about knowledge - it's about agreement.

A and B must not attack alone, i.e. A must not attack if B is not going to attack, and B must not attack if A is not going to attack.

Now, message 1 (from A to B) says "attack!". So B knows that A wants B to attack.

...but A mustn't attack alone, so A won't attack unless A receives B's confirmation. So B sends message 2 ("sure!").

...but B mustn't attack unless B knows that A is going to attack, and A will only attack if A has received message 2. So A has to send another message to confirm receipt of message 2.

...and so on.

None of these confirmations are simply "nice-to-have" - if you need to be 100% sure that you're both going to attack, you need to ensure that every confirmation you send has been received by your peer.

"Dear diary. I have just sent the 537th messenger confirming that we are attacking together tomorrow morning at 9. I'm just waiting on confirmation."

You have that confirmation of agreement in the third round of messages. Receipt of the fourth message proves that both sides have knowledge and have agreed, aware of the other's intent. That's the message you have to prove has arrived for surety. Once you've done enough messages to prove message #4, you're good to go.


Question: what is the relationship (if any) between TCP and the Two Generals Problem?

The Two Generals Problem deals with communicating over an unreliable channel, as does TCP.

TCP solves the problem by retransmission, similar to approaches illustrated in https://en.wikipedia.org/wiki/Two_Generals%27_Problem#Engine...

Note that it's impossible to solve it completely, and TCP is no different. If you unplug your router, there is no hope of getting a message through. But it's highly unlikely that there is no path between you and your destination (a "partition") so TCP retransmits until it finds one. This would be similar to generals sending messengers until one gets through and a confirmation messenger comes back.

Maybe read the opening paragraph of the linked wiki article? It's within the first few sentences.

Everybody knows "exactly once" means deduplication. This is not exactly a new problem.

That said it's still a difficult problem and I actually wish people would stop trying to roll their own schemes. For example, this scheme relies on examining a Kafka outbound topic to resolve in-doubt outbound messages. But what happens if the outbound message + commit is still "in flight" when the system recovers so the system retransmits the in-doubt outbound message and so rather than deduping it now is generating dupes? Yes, the chances of this are minimal which means it will happen.

Everyone might know that, but it's certainly the case that a lot of systems have _claimed_ it where they actually meant "disastrous characteristics under load and/or (partial) failure".

The protocol is in fact quite simple:

1. Sender sends message.

2. If no acknowledgement from recipient is returned to sender, resend message until an acknowledgement is received from recipient.

3. If recipient receives message, check store to see if it's been received before.

4. If it's not in the store, store it, acknowledge receipt to sender, and process it.

5. If it's already in the recipient's store, acknowledge receipt to sender, and discard message.

What if the "acknowledge receipt to sender" message gets lost?

Server will keep trying to send the message. Each time the client responds with receipt. Eventually communication succeeds and protocol is done. The major drawback is requirement of persistent storage on the client.

That's not a drawback - it's an advantage. It enables store-and-forward, i.e. on- and offline use. The client remains functional without a connection to the server.

Exactly-once messaging is not a hard problem so long as you change the problem a little. (Plug warning: this is the way Urbit does EOM, or EOM* if you prefer.)

TLDR, you don't need infinitely durable infinite memory. You just need (a) a single-level store in which every event is a transaction, (b) a message protocol with true end-to-end acks, and (c) a permanent session between every pair of nodes. We don't have single-level storage hardware (although XPoint comes close) but it's easy to simulate semantically.

Think about the problem intuitively. I want to send you a stream of messages, each of which you act on exactly once.

I do it like this: I put a sequence number in each message. I keep sending you message 1 until you send me an acknowledgment that you heard message 1. I can also send messages 2, 3, or whatever (lockstep isn't needed), but you process messages in order and ignore messages you've already heard. Never ack an ack, always ack a dup.

What does implementing this design require? It requires a persistent sequence number, on the sending endpoint, for every receiving endpoint in the world. (Edit: and of course another SN on the receiving end.) Of course, for every endpoint you haven't sent to yet, the number is 0 and doesn't need to be stored. This is not a terribly onerous amount of storage.

A sequence number is the piece of state generally known as a "session" in networking theory parlance. Of course a TCP connection is a session. We're simply saying that every two endpoints have a sort of implicit, persistent connection.

Moreover, every message must be a transaction. Acknowledging the message acknowledges that the application, as well as the sequence number, has been fully and persistently updated with the knowledge that the message contains. One way to think of this is that we're adding the latency of a transaction to persistent storage to our packet latency. SSDs are good here.

Furthermore, since in real life semantic messages need to be delivered in packet-sized fragments, a fragmentation model with end-to-end "piggybacked" acks is needed. There can't be a separate message-level ack (the "stack of acks" problem is the curse of the Internet stack) -- acknowledging all the fragment packets acknowledges the message.

All this and more is explained here (plug alert):


You've embedded the idempotency into the protocol, which is nice, but doesn't get around the problems of not being able to do EOM. You also have the case where you sent a message and lost network connectivity before any ack could come back, resulting in you not knowing if your message arrived 0 or 1 times, which isn't surprising since that case is a constant on a network with anything less than 100% reliability.

That's not EOM, that's just the sensible workarounds to use in light of the fact EOM doesn't exist. Obviously a lot of protocols and applications use such things since the inability to have EOM in the real world has not rendered our entire network edifice impossible or useless in real life.

If you define EOM as magic, or as a solution to the Two Generals problem, EOM is certainly impossible.

If EOM means "the programmer doesn't have to think about idempotency," EOM is what I want. Happy to call this "EOM asterisk" if you and/or the OP like.

At any point in the conversation, you don't know whether your interlocutor has yet received the last message you sent. This is because you are talking over a network, rather than over a magic bus.

However, you know that before the endpoint processes each message, it has processed each previous message once and exactly once. I think this is what the programmer wants -- asterisk or no.

Network connectivity failure is best modeled as the limit case of network latency. Of course, when you send a message over a network in the real world, you can't know whether it has been received or not until you get an acknowledgment.

(Edit: asterisks.)

Also, I would say that the worst thing about the inability to do EOM on the Internet stack is that it puts the responsibility for idempotence on the application programmer, then tests that corner case 1 in 100 times, maybe 1 in 1000.

It is not impossible to handle a system that produces rare corner cases. It's just expensive and a pain in the butt.

"Exactly once" model of message is theoretically impossible to do in distributed environment with nonzero possibility of failure. If you haven't received acknowledgement from the other side of communication in the specified amount of time you can only do one of two things:

1) do nothing, risking message loss

2) retransmit, risking duplication

But of course that's only from messaging system point of view. Deduplication at receiver end can help reduce problem, but itself can fail (there is no foolproof way of implementing that pseudocode's "has_seen(message.id)" method)

I agree. I wish more messaging platforms would recognize this and stop trying to paper-over the failure mode (Kafka just came out with "Exactly Once," which I think is a variant of the 2-Phase-Commit protocol, which still does not solve the problem).

My go-to for explaining to people is the Two Generals Problem https://en.wikipedia.org/wiki/Two_Generals%27_Problem

After watching the video from the Kafka Summit talk https://www.confluent.io/kafka-summit-nyc17/introducing-exac...

it's an essentially-useless guarantee for any sort of Kafka consumer that interacts with an external system.

Exactly-Once is not something that can be provided-for or delegated-to an arbitrary, non-integrated system. For an "integrated" system, it requires idempotent behavior, in which case it's really At-Least-Once, so...

I think "exactly once" doesn't imply "non-reprocessing" (sorry for the double negative).

Meaning, you want "exactly once" and you don't want duplicates, yes. But you allow for reprocessing, provided that you have a way for deduplicating.

You want a guarantee that if the producer (at the top of your data processing pipeline) sends a message, then this message eventually corresponds to exactly 1 record in your final storage(s).

One easy-to-understand-yet-simplistic example is: send a message to kafka, use topic+partition+offset as primary key, store in a RDBMS. This is widely accepted as "exactly once", but clearly you may have multiple attempts to save the message into the db, which will fail due to the primary key integrity constrain.

So basically what's called "at least once with deduplicating". The parent comment addresses that.

> there is no foolproof way of implementing that pseudocode's "has_seen(message.id)" method

Wait why? Just because you'd have to store the list of seen messages theoretically indefinitely?

There's also a race condition in there when you receive the duplicate before publish_and_commit is done doing its thing - assuming they're not actually serializing all messages through a single thread like the pseudocode implies.

What they've done is shift the point of failure from something less reliable (client's network) to something more reliable (their rocksdb approach) - reducing duplicates but not guaranteeing exactly once processing.

its not so much that they are serializing all messages through a single thread, but that they are consistently routing messages (duplicates and all) into separate shards that are processed by a single thread.

sure, if you assume that everything can fail, then it doesnt help to store the list of messages you've seen

but if you can persist a monotonic sequence number, thats gets you pretty far. we use tcp all the time even though its has no magic answer to distributed consensus (and uses a super-weak checksum). 2pc doesnt guarantee progress and/or consistency either and its pretty effective.

> nonzero

If you send the message with a hash of the previous state on the server, (like with proof-of-work in Bitcoin), since it is so unlikely that it will hash will be the same with and without the message appended, it doesn't really matter if it is strictly nonzero, if it is just small enough.

The nonzero part is about the possibility of network / endpoint failure. No hash is going to prevent the cleaning lady from 'freeing up' the power socket for her vacuum.

You could resubmit the transaction until you get an ack. You wouldn't submit twice since the hash of the state with the added item is very likely to differ.

In terms of connectivity, we deal with a similar problem here at CloudWalk to process payment transactions from POS terminals, where most of them rely on GPRS connections.

Our network issues are nearly 6 times higher (~3.5%) due to GPRS, and we solved the duplication problem with an approach involving both client and server side.

Clients would always ensure that all the information sent by the server was successfully received. If something goes wrong, instead of retrying (sending the payment again), the client sends just the transaction UUID to the server, and the server might either respond with: A. the corresponding response for the transaction or B. not found.

In the scenario A, the POS terminal managed to properly send all the information to the server but failed to receive the response.

In the scenario B, the POS terminal didn't even manage to properly send the information to the server, so the POS can safely retry.

Why not just send the data too along with the UUID? It'll save another roundtrip in case of scenario B right? Or do you have data to prove that scenario B is a lot less likely to occur, making it sensible to save bandwidth by not re-transmitting the data?

Here's a radical solution. Instead of becoming a scala pro akka stream 200k engineer with a cluster of kafka nodes that costs your company over $100,000 of engineering time, technical debt, opportunity cost, and server costs, just put it all in bigtable, with deduping by id....

Enough of resume-driven-engineering, why does every need to reinvent the wheel?

Yup. Databases, whether relational or not, have been designed to solve all these problems in a much more "bulletproof" way than your piddly [1] several-dozen-engineer team could ever manage, no matter how genius they are.

[1] No disrespect meant - just a description of size. Source: running a piddly 2-person engineering team.

So, a combination of a best effort "at least once" messaging with deduplication near the receiving edge. Fairly standard, honestly.

There is still a potential for problems in the message delivery to the endpoints (malformed messages, Kafka errors, messages not being consumed fast enough and lost), or duplication at that level (restart a listener on the Kafka stream with the wrong message ID) as well.

This is based on my own pains with Kinesis and Lambda (which, I know, isn't Kafka).

In my experience, better to just allow raw "at least once" messaging and perform idempotant actions based off the messages. It's not always possible (and harder when it is possible), but its tradeoffs mean you're less likely to lose messages.

This is generally better, but we're delivering these messages to integrations which don't necessarily take idempotent actions.

Kafka 0.11 (recently released) has exactly once semantics and transactional messages built-in.

- Talk from Kafka Summit: https://www.confluent.io/kafka-summit-nyc17/resource/#exactl...

- Proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...

My understanding (noob here) is that it allows producers to retry without fear of duplication. You still have to consider the system feeding the producer though. In Segment's example, clients might deliver their messages more than once to the API. Kafka's mechanism wouldn't detect duplicate messages sent to the producer, just that any given message a producer wants to append to Kafka won't be duplicated.

52 requests, 5.4 MB and 8.63 seconds to load a simple blog post. With a bonus XHR request every 5 seconds.

That's not everything: This website contacted 56 IPs in 6 countries across 47 domains to perform 100 HTTP transactions. In total, 3 MB of data was transfered, which is 5 MB uncompressed. It took 4.103 seconds to load this page. 37 cookies were set, and 8 messages to the console were logged.


And each one made only once!

... and this is why friends don't let friends use Segment.

"The single requirement of all data pipelines is that they cannot lose data."

Unless the business value of data is derived after applying some summary statistics, than even sampling the data works, and you can lose events in an event stream, while not changing the insight gained. Originally Kafka was designed to be a high throughput data bus for analytical pipeline where losing messages was ok. More recently they are experimenting with exactly once delivery.

Yeah, this was a major overstatement. There are lots of data pipelines where it's ok to lose some data. Consider a sensor that sends measurements hundreds of time a second to an app that operates on a 1-second timeframe. And UDP is used all the time on the internet, yet carries no delivery guarantee.

Having built something similar with RabbitMQ in a high-volume industry, there are a lot of benefits people in this thread seem to be glossing over and are instead debating semantics. Yes, this is not "exactly once" -- there really is no such thing in a distributed system. The best you can hope for is that your edge consumers are idempotent.

There is a lot of value derived from de-duping near ingress of a heavy stream such as this. You're saving downstream consumers time (money) and potential headaches. You may be in an industry where duplicates can be handled by a legacy system, but it takes 5-10 minutes of manual checks and corrections by support staff. That was my exact use case and I can't count the number of times we were thankful our de-duping handled "most" cases.

"Exactly Once"

Over a window of time that changes depending on the amount of ingested events.

Basically, they read from a kafka stream and have a deduplication layer in rocks db that produces to another kafka stream. They process about 2.2 billion events through it per day.

While this will reduce duplicates and get closer to Exactly Once (helping reduce the two generals problem on incoming requests and potentially work inside their data center), they still have to face the same problem again when they push data out to their partners. Some packet loss, and they will be sending out duplicate to the partner.

Not to downplay what they have done as we are doing a similar thing near our exit nodes to do our best to prevent duplicate events making it out of our system.

To be fair, they are upfront in the beginning about not being able to adhere to an exactly-once model.

"In the past three months we’ve built an entirely new de-duplication system to get as close as possible to exactly-once delivery"

What's annoying is that they do not get precise and formal about what they want out of their new model. Also, their numbers only speak to performance, not correctness.

On the plus side, I think it's awesome to see bloom filters successfully used in production. That sort of thing is easy to implement, but not easy to get right for every use case.

So there's a lot of talk on here about the Two Generals Problem, so I thought I'd chime in with some misconceptions about how the Two Generals Problem relates to Exactly Once Messaging (EOM). WARNING: I'm going mostly on memory with this, I could be completely wrong.

EOM is NOT strictly speaking equivalent to the Two Generals Problem, or Distributed Consensus, in an unreliable network. In distributed consensus, at some given point in time, A has to know X, A has to know B knows X, A has to know B knows A knows X, ... It has to do with the fact that the message broker is in some sense the arbitrator of truth, so the consumer(s) don't need full consensus. In an unreliable network, you can have EOM. http://ilpubs.stanford.edu:8090/483/1/2000-7.pdf gives some examples of how that works.

HOWEVER, you can't have EOM when the consumers can fail. If a consumer fails there's no way, in general, to tell if the last message it was working on was completed.

There are a couple of edge cases where you can still have EOM. For instance, a system where you have a message broker A, and a bunch of consumers that read messages x from that queue, compute f(x), and insert f(x) onto message broker B, where f(x) may be computed multiple times for the same x (i.e. if f is a pure function or you don't care about the side effects). This system can implement EOM in the presence of an unreliable network and consumer failures (I think it can handle one or both of the message brokers failing too, not 100% sure) in the sense that x will never be in broker A at the same time as f(x) is in broker B, f(x) will never be in broker B more than once for the same x, and any y in B had some x that was in A such that y = f(x).

Was thinking a 'reverse bloom filter' could be cool to possibly avoid the RocksDB for situations like this- turns out it already exists: https://github.com/jmhodges/opposite_of_a_bloom_filter

I love it when that happens.

It should be noted it's impossible to have a 'reverse bloom filter' with the same properties as a regular bloom filter. That is, a constant predefined size gives you a particular false negative rate. The thing you linked to is really just a cache. It has to store an entire entry (not just set some bits based on the hash) and doesn't have a predictable false negative rate based on its size. For more info: https://cstheory.stackexchange.com/questions/6596/a-probabil...

Sounds very cool. A couple of questions I had:

1) What happens if they lose their rocksdb with all of the messageIds?

2) Is their kafka atleast-once delivery? How do they guarantee that kafka doesn't reject their write? Also, assuming they have set up their kafka for at least once delivery, doesn't that make the output topic susceptible to duplicates due to retries, etc?

3) >Instead of searching a central database for whether we’ve seen a key amongst hundreds of billions of messages, we’re able to narrow our search space by orders of magnitude simply by routing to the right partition.

Is "orders of magnitude" really correct? Aren't you really just narrowing the search space by the number of partitions in kafka? I suppose if you have a hundred partitions, that would be 2 orders of magnitude, but it makes it sound like it's much more than that.

> What happens if they lose their rocksdb with all of the messageIds?

I'm wondering the same.

I wonder how they partition by "messageID" they use to ensure that the de-duplication happens on the same worker. I would imagine that this affects their ability to add more brokers in the future.

Perhaps they expect a 1:1 mapping of RocksDB, partition, and de-duplication worker.

Kafka does this as part of its design. A topic has a declared number of partitions (which can't really be changed on the fly, you choose a theoretical high number and hope it's enough), and an agreed upon hash algorithm chooses between those partitions (probably in Java, so hashCode is readily available for primitives as well as objects). Each partition is really like its own topic, so you lose in-order messaging for anything not included in your partition key.

I'm also curious about this. I'm guessing it's some function on the id that maps it to a partition. What happens when you add more consumers? Is there a way to know which partitions include messageIds that would be put into another partition's ownership by the change and then move anything from one RocksDB instance to the other?

tl;dr: Clickbait headline. Exactly-once delivery not even close to implemented. Typical de-duping, as you've seen and read about hundreds of times already, is what they did.

"Almost Exactly Once" doesn't have quite the same ring to it, but it is actually accurate. We've already discovered better trade-offs haven't we?

If the OP doesn't mind expanding a little on this bit, I'd be grateful.

> If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic.

Does this mean that "on worker crash" the worker replays the entire output topic and compare it to the rocksdb dataset?

Also, how do you handle scaling up or down the number of workers/partitions?

I'm not the OP, but changing the number of Kafka partitions isn't a super graceful operation. You would be wise to add as many as you could reasonably need assuming one consumer thread per partition. But not too many because each one is at least two files on disk!

I haven't used Kafka yet, but a Kafka partition is roughly the same as a Kinesis shard, right?

Yes, but you have to set them up upfront and there is no API to split and merge partitions. If you add more partitions, it doesn't automatically re-shard for you either. I don't think it should automatically re-shard as it would cause a ton of disk and network IO, but just something to be aware of.

It's funny, at my company we implemented deduplication almost exactly the same way for our push notification sender.

The scale is smaller (about 10k rpm), but the basic idea is the same (store a message ID in a key-value store after each successful send).

I like the idea of invalidating records by overall size, we hadn't thought of that. We just use a fixed 24-hour TTL.

Would something like AWS SQS not scale for something like this? We currently push about 25k daily transactions over SQS, obviously no where near the scale of this, just wondering about what limitations we will bump into potentially.

The limitations are most likely on price. For the 200B messages they've already processed in the last 3 months, that would be $100,000 total on just the SQS FIFO queue, or $33,333 per month. And that's not counting data transfer.

As long as everything is in ec2 data transfer will be free. You're cost calculations are also off base. You'll need to send, receive and delete every message that you process via SQS. These can all be done in batches of 10. So it's 200B * 3/10 * .50 / million, which comes out to 60k over 3 months. Still not cheap, Kinesis is probably the better option in this case if you want an AWS managed service.

That'd definitely be a pretty effective limitation.

SQS is not "exactly once", so might not meet their requirements.

sqs has fifo queues which claim to be exactly once

SQS queues deduplicate over a 5 minute window. This is claiming a much larger window.

Either way your listener(s) still has to have its own deduplication. Ensuring a message ends up on the queue only once, and ensuring it's processed exactly once, are two different problems that require separate handling (and, the former is what most out of the box systems claim to solve, while the latter is more important, and, frankly, completely negates the need of the former).

Actually, it can be. You just pay more.

(edit: incorrect, my bad, see thread)

This is wrong. There is a type of SQS queue that indeed does exactly-once. It costs more, and not the default option, but it is there.

Oh, right, they added their FIFO queues. My bad; thanks for the correction. Worth noting, though, that AWS's own services can't talk to FIFO queues. If you want to wire up SNS or Lambda dead-letter queues to a FIFO queue, you are out of luck.

It's worth noting that the next major Kafka release (0.11, out soon) will include exactly once semantics! With basically no configuration and no code changes for the user. Perhaps even more noteworthy is this feature is built on top of a new transactions feature [0]. With this release, you'll be able to atomically write to multiple topics.

[0] https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...

Isn't the new feature of Kafka about this?


It would help, but messages are sent to the API first. We aren't sending messages directly to Kafka from the Internet, of course.

So we can get duplicate API submissions regardless of whether or not we enabled transactional productions into kafka from a producer.

> [I]t’s pretty much impossible to have messages only ever be delivered once.

IIRC, it's provably impossible in a distributed system where processes might fail, i.e. all real systems.

Relevant to this topic: Description of exactly-once implementation in Google Cloud Dataflow + what "exactly once" means in context of streaming:


(Google Cloud emp speaking)

With deduplication state on the worker nodes, how does scaling up, or provisioning new machines, or moving a partition between machines work?

Qubit's strategy to do this via streaming, leveraging Google Cloud Dataflow:


What is so exciting about this? There is still possibility of duplicates. You still have to put the engineering effort to deal with duplicates end-to-end. If the code is there to deal with duplicates end-to-end, then does it really matter to have 5 duplicates or 35? Or may be they just did it to add some useful cool-tech in to CV?

> There is still possibility of duplicates.


Why do I get the feeling this is repeating TCP features at the Message level? There must a protocol that can hide this exactly once need away. TCP doesn't create downloads, generally, that are bad and fail their checksum test, hence packets that make up the file are not duplicated.

Yes there is some duplication of TCP capability here.

The problem with relying on TCP for reliability is that its state is in memory, associated with a particular peer IP address, and acknowledgements passed back to the sender only indicate that the receiver has the data in local memory, not that the data has been processed.

A file download over TCP can fail, for example due to a network problem. Ensuring reliable delivery requires additional measures outside of TCP, such as retrying the download using a new connection.

In practice, this means that TCP is primarily useful for providing flow control and offering a streaming interface (no worry about packet sizes). Less so as a complete solution for transmission reliability.

How would you use TCP sockets to de-duplicate Kafka streams with a many-to-many communication pattern? Surely there is a valid scalability reason for why AWS IoT only provides "at least once" guarantees in their MQTT broker even when TCP is the underlying transport [1].

[1] http://docs.aws.amazon.com/iot/latest/developerguide/protoco...

This is interesting work. But I think I'll continue relying on at least once and idempotency. Exactly once is impossible anyway.

> In Python (aka pseudo-pseudocode)

This annoyed probably more than it should have.

This isn't the solution I would architect. It is much easier to de-duplicate when processing your analytics workload later and you don't need to do so much work.

That's not true for all the hundreds of real-time integrations that Segment sends data to. Many are write-only.

Fair. For that case this solution makes more sense than the pure analytics case.

That reminds me of the safety-related protocols we use since years in embedded electronics like rail-road signaling, medical, and other areas.

Site seems to be down. Any ideas how big these HN hugs of death usually are? How big of a traffic spike brings these servers down?

I got front paged once a few years ago, it was about 15,000 page views in the course of a few hours. It's probably grown since then, but sure by what factor.

hm seems to be up for me? you might've caught our autoscaler mid-scaling!

It's not down

Hmm weird. I get: This site can’t be reached

segment.com refused to connect.


Checking the connection


Maybe something local to me only?

Check your ad blocker/hosts file.

In here, uMatrix just blocks the site with the message:

> uMatrix has prevented the following page from loading:

> https://segment.com/blog/exactly-once-delivery/

I checked, and one of my uMatrix hosts files includes 'www.segment.com'.

Yep. Disabling AdAway seems to do the trick. Thanks for the heads up

You can also add an exception for a blocked site by marking it green and clicking on the opened padlock in the umatrix panel.

No need to nuke a city to get a fly.

Its probably an ad- or tracker-blocker. I had to disable mine to be able to load segment.com, since they're pretty much a tracking company.

Might be an adblock issue? (some blockers treat direct access to a website and loading an anlytics snippet as the same)

You are probably blocking this site in your hosts file. Since it's a tracking tool.

Awesome story. What I would like to hear more about, is the people side. The teams and personalities involved with coming up with this new system and the transition.

... or they could have used BigQuery with a primary key on message ID.

BQ doesn't have primary keys. Perhaps you are thinking of the id that can be supplied with the streaming insert? This has very loose guarantees on what is de-duplicated (~5m iirc)

yea I think within the context of BigQuery the most sensible thing would be to do an aggregate per the column that would be considered a primary key. For example [0]. That said, Streaming API de-dupe window is very nice in practice.

I mentioned elsewhere on Google Cloud the most elegant way of doing this is with Google Cloud Dataflow [1]

(work at G)



Guidelines | FAQ | Support | API | Security | Lists | Bookmarklet | Legal | Apply to YC | Contact