Hacker News new | comments | show | ask | jobs | submit login
Exactly-once Semantics: How Kafka Does it (confluent.io)
259 points by listentojohan on June 30, 2017 | hide | past | web | favorite | 39 comments



Providing API for building applications that have transactions and help with idempotent producing is really exciting. This will help lower a lot of pains associated with building stream processing systems. Doing it with very little performance overhead is amazing.

I do feel that calling it "Exactly-once delivery with kafka" is slightly misleading as this requires the applications to be written in a certain way. The title makes is sound too general and borders on claiming something that is close to impossible. I dont want to be too critical here as the author was very honest with what this means in the blog post. Regardless of the title this is an amazing feature.


> calling it "Exactly-once delivery with kafka" is slightly misleading

Indeed. Idempotent operations is the mainstay of managing "at least once" message delivery. If you had a true "exactly once" system, idempotency would be unnecessary.

> Regardless of the title this is an amazing feature.

I feel it's a good iterative step forward, but nothing near the "revolutionary" that's being boasted.


Not "close" to impossible -- actually impossible.


Also known as the Two Generals Problem


The reasons this is "currently" impossible are: - We cannot build communication infrastructures that are 100% reliable - We build apps that actually crash.

I would not venture as far as to say this is impossible. If one day we are able to leverage quantum entanglement phenomenon in telecommunications network this would certainly be very much possible.


You've been downvoted, but the trick would be to convert asynchronous messaging to synchronous processing that proceeds at a predictable tick. That would make it possible to bypass the FLP result. I'm willing to imagine quantum entanglement could pull this off if we were able to entangle actual transistors. Dunno how plausible that is, but points for creativity if nothing else.


The machine that reads the quantum entagled particles will still occasionally break or have coding errors.


I don't really know a thing about that sort of network, but if you can assume a reliable network, the machine reading the packets going down shouldn't matter. The sender won't get an "ACK", will know the other machine is at fault and not the network, and can resend.

Given a "perfect network" I think exactly-once-delivery is possible, right?


What if the sender fails before it can fully process the ACK? When it comes online again it won't know that the last message sent was delivered.


Transactions.


> We build apps that actually crash.

And disks that fail and memory that bitflips and communication methods that are susceptible to random noise and machines dependent on failable power sources.


These are some pretty huge improvements to what was previously the weakest link in Kafka's API: the Producer.

However, it's important to note that this can only provide you with exactly-once semantics provided that the state/result/output of your consumer is itself stored in Kafka (as is the case with Kafka Streams).

Once you have a consumer that, for example, makes non-idempotent updates to a database, there's the potential for duplication: if the consumer exits after updating the database, but before committing the Kafka offsets. Alternatively, it can lead to "message loss" if you use transactions on the database, and the application exits after the offsets were committed, but before the database transaction was committed.

The traditional solution to this problem is to provide your own offset storage for this consumer within the database itself, and update these offsets in the same transaction as the database modifications. However, I'm not certain that, combined with the new Producer API, this would provide exactly-once semantics.

Even if it doesn't, it's still a huge improvement, and significantly reduces the situations under which duplicate messages can be received.


To clarify: the transaction feature is a transaction over updates to Kafka and provides transparent, correct processing in Kafka for Kafka Streams or anything else only maintaining state/output in Kafka. The trick you described where you either transactionally or idempotently store your offsets and state updates in an external store when consuming in a general app still works, but obviously for that to work it requires there are no duplicates in the log itself, which the idempotence feature in the producer described here enables.


There's lots of words in this article (and design doc: http://goo.gl/EsdTXo ) but it's missing a very simple key points?

1. State diagram: What are the messages exchanged between Producer and Consumer (and how many round trips to confirm a message)

2. At what point does each side consider the message to have been delivered?

3. Has this been tested empirically? (i.e: Setup producer, consumer, and partition/kill each side randomly to see if messages get lost)

The one thing I don't understand is the following. The two parties communicate by message passing. At some point the message will transition to a new state (i.e: delivered). That transition cannot happen on both sides at the same time. So how do you handle the failure of sending of the last message? Do you stage messages until after the timeout period has passed?


Points 1 & 2. There is no direct communication between a producer and consumer. The producer writes to the broker, the consumer reads from the broker. There is a detailed flow diagram for the producer side operations in the article, and this deck has more of the details of how both the producer and consumer work: https://www.slideshare.net/apurva2/introducing-exactly-once-...

3. Yes, it has been tested empirically. Quoting from the article:

> We wrote over 15,000 LOC of tests, including distributed > tests running with real failures under load and ran them > every night for several weeks looking for problems. This > uncovered all manner of issues, ranging from basic > coding mistakes to esoteric NTP synchronization issues > in our test harness. A subset of these were distributed > chaos tests, where we bring up a full Kafka cluster with > multiple transactional clients, produce message > transactionally, read these messages concurrently, and > hard kill clients and servers during the process to > ensure that data is neither lost nor duplicated.


The blog assumes you already know a bit about the Kafka architecture & is more of a marketing announcement than documentation.

This has been under development for a long time in the open, you can track the features history here https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...


This is pretty cool stuff. I have been working with Apache Flink where the computational model is exactly once (which is still hugely useful) but it still comes with caveats about duplicate messages downstream, which makes writing to downstream db or having multiple flink processes tricky. Often times this is done by windowing data and producing complete snapshots of that window (i.e. you need to keep all the state for a given window) such that the message downstream is idempotent and then aggregating windows together (which can be non trivial for certain aggregations).

Once flink (and other systems) support this, I think it will really be a game changer. It will allow for doing things like sending an increment downstream rather than needing to keep all that state. Instead of mostly seeing stream processing as a thing we do for analytic use cases, it can really become the backbone for streaming applications.

Event Sourcing/CQRS has been an idea for a while, but in practice, is difficult to do because to the inherent difficulty in dealing with message semantics and consistency (see https://data-artisans.com/blog/drivetribe-cqrs-apache-flink for a good write up about such an app). The ability to independently optimize for both reads and writes while also not having to make all messages be idempotent, in my mind, will make this feasible for a broad range of teams and won't require a huge amount of work in thinking up clever message formats or working through as many failure scenarios.

That being said, I fully expect this to bite hard when the caveats aren't understood (such as when interacting with external databases) and there are still other hard problems. Like creating a consistent down stream view of an app in terms of business events rather than hooking into a database transaction log.

Still though, I think these are the sort of solutions needed to make distributed systems easier, even though there are lots of caveats :)


Boy would I love to see a Jepsen test of this one.


It's not Jepsen, but we actually do a fair amount of system and integration testing, some of which does things like kill nodes (randomly, the controller, etc) and validates data is delivered correctly. There is some ongoing work to add other fault injection tests: https://issues.apache.org/jira/browse/KAFKA-5476

This has already caught quite a few bugs, including one that led to a change to the replication protocol which is included in this 0.11.0.0 release. See the community KIP here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+...

One cool thing that happened recently with these tests is that they were modified to make the client implementation pluggable: https://github.com/apache/kafka/pull/2048 Confluent uses this functionality to test all of its clients (librdkafka, confluent-kafka-python, confluent-kafka-go, confluent-kafka-dotnet) in addition to the Java clients. This not only makes us confident of these clients from their first release, but has also found dozens of bugs in both the clients and the broker implementation itself. Getting automated testing across many clients has really stepped up the quality and robustness of both existing and new features.

If you're interested, the tests themselves are here: https://github.com/apache/kafka/tree/trunk/tests


I should also mention that system/integration tests of the new functionality was a requirement for releasing it. For example, here's one of the tests https://github.com/apache/kafka/blob/trunk/tests/kafkatest/t...


For what it's worth, the Jira for KIP-101 was created in January 2014. That has been a known potential Kafka data loss scenario for quite a while, just took some time (and evidently the findings of these new stress tests) to be prioritized as a serious problem that needed to be fixed.


So its exactly once semantics, not exactly once delivery. Adding a dedup is not exactly once delivery, its being idempotent, its exactly once commit, we've had this for years. Having clients request for committed messages and keep track of their progress is not exactly once delivery, its exactly once request. Exactly once delivery has messages pushed to clients. As blogs have noted this is not possible.


https://techcrunch.com/2017/06/30/confluent-achieves-holy-gr...

There is a crazy claim of 'solving the unsolved problem for so many years' from Confluent CTO Neha Narkhed as mentioned below from the above article.

“It’s kind of insane. It’s been an open problem for so many years and Kafka has solved it — but how do we know it actually works?” she asked echoing the doubts of the community.

This is solved only in consumer-transform-produce scenario addressing kafka streams exactly once requriements. This is a nice feature for kafka streams but it is better to avoid tall claims like the above.

Actually blog post also started with major claims but it was made clear in later paragraphs on how it is acheived through de-duplicating messages in broker and the limitations on consumer side. Title of the blog post and comments in the tech crunch does not seem to be in good spirit.

Idempotent producer does not give any API to solve exactly once producer fecthing messages from target systems and sending them to topics.

I like kafka and the design choices taken to keep things simple for users but all these tall claims should have been avoided.


Does this just mean (in TL;DR form) that Kafka now has producers generate an ID for each message they send, and Kafka deduplicates it now instead of requiring deduplication on the end consumer?


That is part of it but there is also a general purpose transaction feature that lets you link together updates, state journalling and consumption all in a single transaction. This enables correct stream processing on top of Kafka and is arguably the more technically sophisticated aspect.


This comment in tech crunch article is misleading and you may want to clarify on that. I agree that this is a nice feature for kafka streams or applications having consume-transform-produce loop. I as user/dev like kafka because of the design choices taken to keep things simple and much effective for users.

https://techcrunch.com/2017/06/30/confluent-achieves-holy-gr...

“It’s kind of insane. It’s been an open problem for so many years and Kafka has solved it — but how do we know it actually works?” she asked echoing the doubts of the community.


From the OP source -

" How does this feature work? Under the covers it works in a way similar to TCP; each batch of messages sent to Kafka will contain a sequence number which the broker will use to dedupe any duplicate send. "


This is basically it. I'm always amazed at how much reimplementation of TCP we see at a high level in distributed systems. Backpressure, message ordering, retries, etc. all work pretty well in TCP.


Yes the idempotence part of this feature set is very similar to TCP (the transactional consumption and updates obviously aren't). But this isn't a reimplementation at all. TCP provides deduplication within the context of a connection tied to a process. If that connection is lost or the process dies then duplicates may occur. The feature in Kafka is much stronger as the "connection" is persistent and replicated with the log so effectively the "connection" fails over if the server dies.


Probably these new over-layers of abstractions are needed for some use-cases in for specialized and more performant networks. Otherwise, the simple fact of using TCP would be enough.

But indeed, it is interesting to see the computing cycle reimplementations spinning over and over again.


Yeah, my point wasn't "TCP does this, that's all we need". It was just an observation about how we apply principles at a higher level.


Re: TCP, it only maintains sequencing guarantees over the lifetime of a single connection. Obviously this is too weak of a guarantee for Kafka as leaders can change in a cluster. We've built idempotence in a way that makes the sequence number and producer ID part of the Kafka log. So it can provide idempotence even if brokers fail and new connections are established between the producer and broker


Dear hacker news:

1. Please read the section entitled "Is this Magical Pixie Dust I can sprinkle on my app?" Before making angry comments. Answer: for general consumer apps just consuming messages, no. However, Kafka's design, which let's the consumer control it's position in the log, combined with this feature which eliminates duplicates in the log make building end-to-end exactly once messaging using the consumer quite approachable. For stream processing using Kafka's Streams API (https://www.confluent.io/blog/introducing-kafka-streams-stre...), where you are taking input streams, maintaining state, and producing output streams, the answer is that it actually kind of is like magic pixie dust, you change a single config and get exactly-once processing semantics on arbitrary code. Obviously you still need to get the resulting data our of Kafka, but when combined with any off-the-shelf Kafka connector which maintains exactly-once you can get this for free. So for that style of app design you actually can get correct results end-to-end without needing to do any of the hard stuff.

2. Someone is going to come and say "FLP means that exactly once messaging is impossible!" or something else from a half-understood tidbit of distributed systems theory they picked up on a blog. Let me preempt that. FLP is about the impossibility of consensus in a fully asynchronous setting (e.g. no timeout-based failure detection). Of course as you know the vast majority of the systems you use in AWS or your own datacenter depend in deep ways on consensus. Kafka itself is a CP log, about as close of a direct analog to consensus as you could ask for. Obviously Kafka and all these systems are "impossible" in the same sense that if you can make the network or other latency issue bad enough you can make the system unavailable for writes. This feature doesn't change that at all, it just piggybacks on the existing consensus Kafka does. It doesn't violate any theorems in distributed systems theory: Kafka and any consensus-based system can't work in a fully asynchronous setting, Kafka was a CP system in the CAP sense prior to this feature and this feature doesn't change that guarantee.

For those who want a deeper dive into how it all works there is a longer write up on the design here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E... The use for stream processing is described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A...


Note that you can in fact solve the exactly-once delivery problem using the model of FLP. It is much easier than consensus, because once message receipt is acknowledged it does not has to become a common knowledge.

The only serious objection to the idea of exactly-once delivery is that in some models of computation, you can't even solve it on a single node. It may be impossible to process a message and record the fact that this processing have been done in a single atomic step - making any node failure in-between those steps unrecoverable without reprocessing. This is exactly the objection that was raised in referenced article "You Cannot Have Exactly-Once Delivery". Though I don't think it is particularly illuminating observation, especially that it doesn't have anything to do with distributed nature of the system.


Is this similar to MQTT qos 2 ?


Sounds great, but I'll wait until Aphyr has done his worst to prove that it doesn't work.


Or that Aphyr can do his best to help us improve what's built on a strong foundation of sound design, has gone through years of thinking and review by the broader Kafka community, and has been through months of testing.


Aphyr has a distinguished track record of breaking things that have gone through "years of thinking and review" and "months of testing". I'll trust his analysis over anyone else.


Personally, I would welcome Aphyr trying to break the Kafka EOS guarantees. Like all software, there will be bugs, and the exposure will only make it stronger and more viable.

I write as one of the implementors of exactly once guarantees in Kafka.




Applications are open for YC Winter 2019

Guidelines | FAQ | Support | API | Security | Lists | Bookmarklet | Legal | Apply to YC | Contact

Search: