In Flink the SQL query planner may instantiate two different consumers groups on transactions; that topic might be read twice. These can progress at different rates, which is partly why some inconsistencies could appear.
Now, the trick: converting transactions to datastream then back to SQL will introduce a materializing barrier, and you will benefit from high temporal locality. There might be some inconsistencies left though, as the JOIN requires some shuffling and that can introduce some processing skew. For example the Netflix account will be the target of many individual accounts; the instance responsible for Netflix might be a hot spot and process things differently (probably by backpressuring a little, making the data arrive in larger micro batches).
Anyway when processing financial data you might want to make the transaction IDs tag along the processing; maybe pair them back together in tumbling event-time windows at the total. Like he said: 'One thing to note is that this query does not window any of its inputs, putting it firmly on the low temporal locality side of the map where consistency is more difficult to maintain.'. Also, windowing would have introduced some end-to-end Latency.
This makes me think: Streaming systems introduce a new letter in the CAP trade-off?
I humbly propose: The CLAP trade-off, with L for Latency.
> converting transactions to datastream then back to SQL will introduce a materializing barrier
It seems that this would still leave the problem where each transactions causes two deletes and two inserts to `balance` and then `total` sums those one by one? These errors are very transient but they still cause at least 3/4 outputs to be incorrect.
> tumbling event-time windows
Will this work for the join? For a given account, the last update to each of credits and debits may have arbitrarily disparate event times and transaction ids. It doesn't seem like there is window you could set that would be guaranteed to connect them?
SELECT ... EMIT STREAM AFTER DELAY INTERVAL '6' MINUTES ; -- to fix the output multiplication you mention
SELECT ... EMIT STREAM AFTER WATERMARK ; -- this should fix the two deletes and two inserts
I don't know if this is still in active development, though. There has been no news since the paper got published 2 years ago.
> It doesn't seem like there is window you could set that would be guaranteed to connect them?
No there isn't. But I'm quite confident having a transaction-centric approach can be obtained in SQL. Your use case is perfect for illustrating the problem, still.
I'd try something like:
CREATE VIEW credits AS
SELECT
to_account AS account,
sum(amount) AS credits,
last_value(id) AS updating_tx
FROM
transactions
GROUP BY
to_account;
And then try to join credits and debits together by updating_tx.
No idea on how to check that accounts don't go negative, though.
> And then try to join credits and debits together by updating_tx.
You can't join on updating_tx because the credits and debits per account are disjoint sets of transactions - that join will never produce output.
I did try something similar with timestamps - https://github.com/jamii/streaming-consistency/blob/main/fli.... This is also wrong (because the timestamps don't have to match between credits and debits) but it at least produces output. It had a very similar error distribution to the original.
Plus the join is only one of the problems here - the sum in `total` also needs to at minimum process all the balance updates from a single transaction atomically.
You could instead put the global max seen id into every row, but then you would have to update all the rows on every transaction. Which is not great peformance-wise, but would also massively exacerbate the non-atomic sum problem downstream in total.
Latency has always been part of the CAP theorem. There is a strictly stronger generalization of it that says in a consistent system the latency of a transaction is bounded by the latency of the links in the network. In the case of a partition the network latency is infinite, so too is the transaction latency, thus no availability.
This is a really interesting look at an aspect of consistency I hadn't previously given a lot of thought to, although it's implicit in a lot of discussions around data modeling with eventual consistency, and I hope it starts a longer conversation.
Eventual consistency in a distributed database is hard enough for application developers to reason about without adding streaming computation concerns.
With Riak 2.0, at Basho we were trying to address that complexity with two different approaches: offering a strong consistency model based on Multi-Paxos, and building CRDTs into Riak to give applications more robust/easier to use primitives.
Unfortunately the company folded before either effort yielded much fruit.
Did basho have any internal discussion about how to ensure that operations reading and writing from multiple CRDTs were still confluent? The only work I've seen on this is http://www.neilconway.org/docs/socc2012_bloom_lattices.pdf but that seems like a better fit for a streaming system than for a general database.
Interesting article. However, I find some claims regarding Flink to be inaccurate:
> Flink is split into two apis - the datastream api is strongly focused on high-temporal-locality problems and so can't express our running example
While the streaming API doesn't have the relational SQL syntax, it should have all the necessary building blocks to do anything that can be done with the table API.
> The file source appears to load the current contents of the file in a single batch and then ignore any future appends, so it's not usable for testing streaming behavior
> Flink also has a 5s trailing watermark, but it doesn't reject any inputs in non-windowed computations.
This is expected, that's how Flink deals with late data by default, so that it doesn't loose any. But it can be done using the Datastream API, afaik. In winodwed computations, allowedLateness(...) can be used to drop late records: https://ci.apache.org/projects/flink/flink-docs-release-1.12.... If the computation graph has no windows such as the article example, the low level operation ProcessFunction can be used to access the watermark using timerService.currentWatermark() and to drop late events. Keep in mind though, that this comes at the cost of parallelization as the data stream should be keyed in order to be able to use timers (see for example: https://stackoverflow.com/a/47071833/3398493).
Also, it seems very odd to me that a watermark of 5s is used while the expected out-of-orderness is 10s. The watermark is usually set in accordance with the extected out-of-orderness, precisely to avoid consistency issues caused by late data. Why did the author choose to do that?
> it should have all the necessary building blocks to do anything that can be done with the table API
That's true in that the table api itself is built on top of the datastream api. But to run the example you'd have to implement your own joins and retraction-aware aggregates and then we'd be testing the consistency properties of that implementation, not of the datastream api itself, and we might as well test a mature implementation like the table api instead.
> The file source can be used for testing streaming behavior, for example by using a directory and using a PROCESS_CONTINUOUSELY watch type.
> This is expected, that's how Flink deals with late data...
I should clarify in the article that I was surprised not because it's wrong, but because it's very different from the behavior that I'm used to.
> Also, it seems very odd to me that a watermark of 5s is used while the expected out-of-orderness is 10s. Why did the author choose to do that?
To explore how late data is treated.
There are some plausible sources of inconsistency that from late data handling that I didn't get to demonstrate in the article. For example, it sounds like even without allowed_lateness if two different windowed operations had different window sizes they might drop different subsets of late data, which could causes inconsistency between their outputs.
Regarding FileProcessingMode.PROCESS_CONTINUOUSLY, what I meant is that you can watch a directory, and write batches to new files instead of appending to the same file. That way, each file will be read only once.
> If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
Thank you for this work! Interesting results.
There's an aspect of the setup which troubles me:
You're building a view over transaction data in order to arrive at current balances. You configure differential dataflow to reject timestamps arriving 5s after the watermark, and DD uses this to understand when a timestamp has been retired and all its effects may now be viewed. Which is very cool, and maintains internal consistency as compared to e.x. Flink, which is eventually consistent.
But there's a cost here: DD must be able to reject inputs which arrive outside of their temporal window (in your setup, ~1/3 of inputs). Which is understandable, except that the system feeding DD may also have hiccups, and there's no way to guarantee all data is presented in time.
Again, the broader context is building a view over transaction decisions made by another system(s). If inputs may be dropped, then the view may be inconsistent with that other system's view of the world.
It seems important to call out that, while Flink is not internally consistent as you define it, it's eventually consistent answer is correct with respect to what the authoring system believes happened. The DD view is internally consistent, but may give you a different answer.
Yeah, I touch on this near the bottom (ctrl-f 'bitemporal'). I think having multiple watermarks would be a neat solution. Kinda like the way flink sends markers through to get consistent snapshots for fault tolerance.
As I understand it, it unblocks a dataflow to provide answers with respect to a user-time processed as-of a system-time, which is certainly an improvement.
But you'd also have unbounded growth of tracked timestamps unless you're _also_ able to extract promises from the user that no further records at (or below) a user-time will be submitted...
a promise they may or may not be able to make.
Yes, I think so. If you want to be able to handle out-of-order data, I don't think there is a way to garbage collect old time periods and still produce correct results unless you stop accepting new data for those time periods.
With the differential dataflow code I wrote for this post, that cutoff point for gc is also coupled to when you can first emit results, creating a sharp tradeoff between latency of downstream results and how out-of-order upstream data is allowed to be. With bitemporal timestamps or multiple watermarks those are decoupled, so you can emit provisional results and correct them later without giving up internal consistency, and also make a totally separate decision about when to garbage collect. That doesn't remove the problem entirely, but it means that you can make decisions about latency and decisions about gc separately instead of them being controlled by one variable.
With flink datastreams there are a lot of different levers to pull. The code that Vasia contributed recently waits for watermarks like differential dataflow. There is also a notion of triggers that emit early results from windowed operators, but these are not coordinated across different operators so they can result in internal inconsistency.
The flink table api, as far as I can tell, mostly just ignores event-time outside of windowed aggregates. So it doesn't have to confront these tradeoffs.
> the broader context is building a view over transaction decisions made by another system(s).
Most the time this is fine. If upstream is:
* a transactional database, we get ordered inputs
* another system that has a notion of watermarks or liveness (eg spanner), we can use their guarantees to decide when to gc
* a bunch of phones or a sensor network (eg mobile analytics), then we pick a cutoff that balance resource usage with data quality, and it's fine that we're not fully consistent with upstream because we're never compared against it
The hard case would be when your upstream system doesn't have any timing guarantees AND you need to be fully consistent with it AND you need to handle data in event-time order. I can't think of any examples like that of the top of head. I think the answer would probably have to be either buy a lot of ram or to soft-gc old state - write it out to cheap slow storage and hope that really old inputs don't come in often enough to hurt performance.
---
This has been an interesting conversation and I'll probably try to write this up to clarify my thinking. I'm not on hn often though, so if you want to continue feel free to email jamie@scattered-thoughts.net
I don't really understand why author treats Flink datastream API as "high temporal locality" and "can't express our running example". Windowing operators provided by Flink are just an one way of using datastream, and you can easily express any operations you want using something like KeyedProcessFunction and manually manipulating the state.
That's true in that the table api itself is built on top of the datastream api. But to run the example you'd have to implement your own joins and retraction-aware aggregates and then we'd be testing the consistency properties of that implementation, not of the datastream api itself, and we might as well test a mature implementation like the table api instead.
1. I have always thought of (eventual) consistency to mean consistency between replicas: how in-sync are the replicas in what they "store". Whereas, internal consistency as defined here seems to mean how "multiple reads" can lock into the same storage state. I believe the two concepts are orthogonal, so comparing the two concepts didn't feel natural to me.
2. If transaction ids are monotonically increasing (an if), isn't it possible for subsequent reads to lock into the maximum transaction id of the first read? For example:
select credits, max(txnid) from table;
select debits from table where txnid <= max_txnid;
> internal consistency as defined here seems to mean how "multiple reads" can lock into the same storage state
Sort of. When you make a single read of a single key at the end of a streaming topology, that's equivalent to reading a all the inputs and doing the computation yourself. Internal consistency means that every path through the graph back to the the inputs should be reading the same set of inputs. Things go wrong when one path has processed more inputs than another, or when a single input produces multiple internal updates and those updates aren't processed atomically.
> isn't it possible for subsequent reads to lock into the maximum transaction id of the first read
Your example would still allow credits to have read more inputs than debits, but it's on the right track.
In differential dataflow aggregates wait until they see a watermark before emitting output, so the sum in `balance` will wait until the watermark for eg time 7 before emitting the sum for time 7. That allows the output from the join to settle.
Of course doing that efficiently is non-trivial...
I wrote a really long reply to a comment that got deleted before I finished. In case other people have the same questions:
> What is a valid subset to consider?
For these purposes, any subset.
> What is "correct output"?
I absolutely agree that deciding whether an output is correct requires actually defining the intended semantics of the streaming program. I don't believe that exists for flink or ksqldb. (It does for differential dataflow, to some degree - https://link.springer.com/content/pdf/10.1007/978-3-662-4667...). If the outcome of this testing was that streaming systems gave a clear definition of their intended semantics which allowed for this degree of error, I think that would be excellent. That's what I was getting at towards the end of the post: "Ideally the behavior would be sufficiently well defined that it's possible to write tests that examine intermediate outputs, because those are the outputs that real deployments will experience."
But in the meantime, for the deterministic sql queries here I think "run the same computation in a consistent batch system and compare the output" is a highly reasonable set of semantics by which we can judge correctness.
> Why would the example be internally inconsistent (per the definition)? The out of date debits data is a subset of the input so far, is it not?
The debits data is using one subset and the credits data is using another. So if we ask "what subset of the inputs is balance computed from" the answer is sort of this one and sort of that one. The end result is that there is no single subset of the data that could produce those results in a consistent system.
> the goal is not to permit arbitrary subsets of data but to restrict the considered subsets of data to a consistent selection.
Yes. The goal is that the final output is computed from a well-defined subset of the data, not from a mishmash of different subsets. I think we're roughly in agreement on the intended behavior and just not on the language used to describe it?
> systems that implement full/immediate consistency.
I don't know what full consistency would mean for a streaming system if not internal consistency + eventual consistency. If you have both, you can send an input and then just block until you see the output for the corresponding timestamp.
Part of the trouble is that all our language for describing consistency comes from read-write transactions in databases and doesn't translate directly into streaming systems. The closest analogue is probably snapshot isolation - https://jepsen.io/consistency/models/snapshot-isolation - but you can't just port the definitions over to a system that doesn't have a concept of transactions.
The dataflow is a diamond:
In Flink the SQL query planner may instantiate two different consumers groups on transactions; that topic might be read twice. These can progress at different rates, which is partly why some inconsistencies could appear.Now, the trick: converting transactions to datastream then back to SQL will introduce a materializing barrier, and you will benefit from high temporal locality. There might be some inconsistencies left though, as the JOIN requires some shuffling and that can introduce some processing skew. For example the Netflix account will be the target of many individual accounts; the instance responsible for Netflix might be a hot spot and process things differently (probably by backpressuring a little, making the data arrive in larger micro batches).
Anyway when processing financial data you might want to make the transaction IDs tag along the processing; maybe pair them back together in tumbling event-time windows at the total. Like he said: 'One thing to note is that this query does not window any of its inputs, putting it firmly on the low temporal locality side of the map where consistency is more difficult to maintain.'. Also, windowing would have introduced some end-to-end Latency.
This makes me think: Streaming systems introduce a new letter in the CAP trade-off?
I humbly propose: The CLAP trade-off, with L for Latency.
[1] https://issues.apache.org/jira/browse/FLINK-15775?filter=-2