In a simple Web-app-writes-to-DB scenario, it's easy to read my writes, but with a async log processing system, how am I supposed to organize my code so I can read my writes and respond with useful information?
Maybe the solution is to eschew request-response entirely and have all requests return 200, then poll or use two-way communication?
Alternatively, I could have my log-appending operation return a value indicating the position in the totally-ordered log, which I could pass to the query interfaces as a way of indicating "don't return until you've processed at least to here." Does anyone do that?
Am I totally off base here? I'd love to hear from anyone who is using these kinds of systems today.
As the article says: "For now, a better option is to extract the log from a database" - i.e you use some tooling to generate a log from the db.
Indeed, you can now see tools that go in this direction usually by using the replication stream. (eg https://github.com/shyiko/mysql-binlog-connector-java and https://github.com/xstevens/decoderbufs)
You actually need all of these at least somewhere in a company, and they each have their place.
The dividing line for request/response is that someone is waiting on the other end and that if the request fails you can just show them an error and continue on your way. So when a web service gets overwhelmed it usually just times out requests.
Consider an e-commerce site as an example:
Displaying a product
Making a sale
2. Stream or batch
Logistics and shipping
Product catalog import
Search index updates
The later category is asynchronous so it can be done in a batch fashion (once an hour or day) if latency is not a concern, or in a streaming fashion if it needs to be faster.
- producing the requests to the log-oriented system,
- consuming the responses at some endpoint/topic of the dataflow,
- linking responses to requests using some id propagated along the whole dataflow,
- dealing with a cache of pending requests, asynchronous responses and timeout
Afterwards, either poll or communicate over sockets back to client side. If there's workers involved on the server: add socketid in the message envelope so it can be matched on return.
I'm already sold on this idea, and would really love to see more posts that get into the nitty-gritty details of how to integrate Kafka, how to migrate an existing infrastructure, case studies, sample code, etc. It all seems very handwavy otherwise.
Elm has this notion of signal – http://elm-lang.org/learn/What-is-FRP.elm – which is really a stream of changing values, that are used to construct varying model and its rendered view (virtual DOM and all that).
I am wondering if we can merge these two notions – signals and commit logs. Consequently, this would replace the traditional "request-response" model in REST API with nothing but signals, thus leveraging the simplicity of FRP for the whole application. Client side Elm code does a "send" on the signal that is connected to the server-side commit log, and also reads from another signal connected to another log that receives new data (added from various places).
The one thing I'm always somewhat confused by though is how a "totally ordered log" intersects with the reality of a partitioned log. The simplicity of a log seems to break down a bit when you partition.
For instance, imagine I want to implement multi-key transactions on top of a distributed datastore. With a totally ordered log this is easy. But with a partitioned log, it becomes much harder.
Alternatively, imagine I want to implement a collaborative editing app like Google Docks or something like Slack. A natural design would be to to have millions of independent logs. I can then replay logs to get current state and watch logs to keep it updated. But as far as I'm aware, partitioned logs like Kafka do not actually support millions of topics. So there's no way to replay a log for something like a channel or document.
In Kafka the purpose of partitions is to provide computational parallelism not model entities in the world. So if you have 100m users you would map that into a number of partitions based on your computational parallelism (maybe 10-100 machines/processes/threads). In other words you would have a single topic partitioned by user id, not a topic per user.
If you have a centralized relational database that maps reasonably well to a single partition log (both in terms of scalability and guarantees).
For distributed databases you generally don't have a total order over all operations. What you usually have is (at best) a per partition ordering, which maps well to a partitioned log as well.
For applications that record events (logging or whatever) it is natural to think of each application thread or process as a kind of actor with a total order.
This is something that tends to surprise developers early on (myself included, years ago). But plenty of people still use queue solutions like RabbitMQ without thinking it all the way through.
Unfortunately, partitioning introduces a design step that makes it a little harder to make processing generic. With RabbitMQ you just post to an exchange and let queues (ie., consumers) filter on the routing keys; if no queues have been bound, for example, messages don't go anywhere. If you want, or don't want, parallelism, you just run either multiple consumers or just one. With Kafka, you need to decide beforehand, and design the "topology" of your log carefully, not just for the producer, but for each consumer. When producers and consumers are different apps, this starts smelling like a violation of the principle of "separation of concerns".
I rather wish Kafka had a better routing mechanism, actually. I don't see any reason why it couldn't have routing keys, just like RabbitMQ.
if you want to build a new derived datastore, you can just start a new consumer
at the beginning of the log, and churn through the history of the log, applying
all the writes to your datastore.
The other cleanup policy is to just have a retention time. After X minutes/days/weeks segments of the log are simply deleted.
Is there a system designed for snapshotting the aggregate and logging the delta?
It does limit throughput for any given shard, though, and then you're left with a distributed transaction problem to solve when you need to commit changes to objects in different repos.
To throw around some terms for those interested in reading up/background:
- The separation of writes (through log) from reads (through any of the consumers) is sometimes called: CQRS (command query responsibility separation)
- having a centralized log as the defining store for updates/ change events is sometimes called: eventsourcing
- as mentioned in article: elastic search as a consumer of the log, which only gets updates through the log, is an example of an Eager Read Derivation.
All defined on site of Fowler.
Glad to see this getting more attention.
Asked about usage for Kafka as an eventsource here some time ago. Includes insightful answer of Kafka author. http://stackoverflow.com/questions/17708489/using-kafka-as-a...
To clarify slightly: Event Sourcing isn't just emitting events for others, it means that your system is re-reading its own log to derive current state from past-events.
For performance reasons you try to avoid this with stuff like periodic "snapshots", but the capability of has to be there.
"Hey, this is just CQRS by a different name!"
It works well, but it is very hard to get large teams of people to all do things this way. All it takes is for one guy to do a direct write, and his reviewer to miss it, and boom, inconsistency.
The Java library for the consumer part, still based on the Scala code, is not that great though. They're rewriting a Java-only library, which is much nicer to use, but I'm not sure when it'll be stable.
I had no idea they had such far-reaching implications and so many jobs where this is the right tool.
They look like whiteboard pictures but not quite.
If you look at some of the screenshots on the site, you might notice some similarities.
Firstly, we are not doing centralized blockchains, we're doing permissioned blockchains which work most efficiently when actors within the system have been identified. These blockchains are simply one element of the Eris Blockchain Application platform we are developing.
Comparing a smart contract enabled blockchain to a distributed messaging broker is a difficult proposition as they are very much apples and oranges in their capabilities, usage, and goals.
That said let me give it a go.
* Kafka and eris:db both work on a distributed model
* Kafka and eris:db both are inherently scalable and persist their data
* Kafka and eris:db both include a distributed messaging layer optimized for their system which is not http
* eris:db persists using merkle-ized encoding schemes with all data having been digitally signed via a node using an ECC-PKI and verified by a logic gateway (known commonly as a smart contract) prior to being persisted -- eris:db further builds logs into blocks and chains those together using traditional blockchain techniques; kafka uses a relatively sophisticated, but not inherently as verifiable "traditional" logging mechanism
* eris:db, as with any blockchain, includes a [fork choice rule](https://eng.erisindustries.com/blockchain/2015/04/30/on-bloc...) for resolving differences within the nodes on the network as to prior history as well as adding a layer of byzantine fault tolerance; from my (admittedly cursory) overview of kafka while it is distributed it is not byzantine fault tolerant
* kafka is fast and meant as a near real time message broker; eris:db is not built for speed and sacrifices some availability for consistency
Message brokers are great complements to blockchains actually as they can provide an additional speed layer for messages which blockchains are not particularly good at.