Hacker News new | comments | show | ask | jobs | submit login
Bottled Water: Stream real-time PostgreSQL change events to Kafka (confluent.io)
30 points by linkmotif on Apr 23, 2015 | hide | past | web | favorite | 8 comments

Cool. Since logical decoding got in I'd hoped somebody would write something roughly like this... I also didn't know about decoderbufs.

> Fortunately, you don’t have to install the plugin on your leader database — you can just use a follower (replica, hot standby) for that purpose. That way you can be sure that the plugin cannot corrupt your data, crash the main database instance or affect its performance.

That bit unfortunately isn't true yet. While we've worked hard that it'd be relatively easily possible to add that capability, it's not yet.

I do wonder why you chose to have a separate client that pushes the data to kafka, instead of doing that in a background worker. That way it'd integrate into the database's lifetime. I.e. it'd automatically get stopped/started when the database stops/starts, it'd get automatically (if you want) restart if it crashes etc.

Oh, sorry to hear that it won't work in a follower. I guess I should have tested that before boldly claiming it :p

"Bottled Water writes the initial snapshot to Kafka by turning every single row in the database into a message, keyed by primary key, and sending them all to the Kafka brokers. When the snapshot is done, every row that is inserted, updated or deleted similarly turns into a message."

Hm, another thing came to mind: What's the reason for the license choice? I guess you chose apache because of kafka? Maybe I'm too much of a postgres nerd, but I always like PG extensions to be under the postgres license ;)

(not that apache 2 is that restrictive...)

Apache 2.0 is just my default license to use for stuff. Would there be any conflict with the PG license?

It's much simpler (being a slightly modified 3 clause BSD license). Including not having a patent clause, which has it's pros and cons...

Getting consistent change streams out of a database — for the purposes of indexing, for example — is something we've been thinking a lot about lately.

Our use case is microservices. Each of our microservices typically have its own data store (although some of them use a generic "document store" if they are simple enough, as do front ends), but we continually need to keep microservices in sync by exchanging data.

For example, we have a role-based permission system, and if a setting is modified (a member is added to a group, say), then a bunch of related systems need to copy the changes in order to optimize lookups that would otherwise swamp the central security microservice.

And indexing. We have a bunch of different specialized search engines that all the microservices need to feed into.

So far it's been very ad-hoc using RabbitMQ (which turns out to be a terrible idea), and we're rearchitecting. The best pattern we've found so far is that every microservice manages its own transaction log, in a schema that mirrors the exact schema used by APIs, and that it provides a standardized API for cursor-based change streaming.

We briefly considered a central transaction log microservice, or a queue, but then you get into problems with atomicity and consistency. Without something like 2-phase commit you can't guarantee that your local database write was also written on the remote end. The only sureproof way, in the microservice world, is to use a local transaction and commit your change as well as the transaction log entry.

Logical decoding is very nifty, and I've been considering it. The challenge here is that the PostgreSQL change stream is relational. In order to untangle it, you need to let the application — the same one producing the data – to be listening on the queue and denormalize it into something that can be consumed by other apps.

I'm not sure I see that working. We keep our database schemas strictly normalized, and untangling a change stream is going to require a bunch of queries to figure out the current versions of tuples before they can be serialized into something like JSON for shipping to an indexer or to another app. Even if you could do that, you encounter the problem when a receiver needs more data than is actually in the changed object, which requires a roundtrip back to whatever authority has the data: For example, a user's photo changed, and to so we need dig out the user, too.

Rather than trying to figure out the relational dependencies and shipping complete updates around, I suspect the only reasonable thing you can do is note the ID of things that change, and ask whoever is listening to fetch the whole object from its owning microservice. You could use logical decoding for this, because mapping tables and primary keys to unique object identifiers is trivial. And you could have a caching layer to ensure that fetches of recently-modified are cheap.

A final problem we're facing is just knowing when microservices have been fully updated as a result of a single update in one app. For example, you have something like: User saves blog post, blog post microservice writes it to CMS microservice, CMS microservice emits change event, change event handling causes reindex of ElasticSearch. Now the user reloads his blog: How can we ensure that the user sees what they expect? The only foolproof way is to use some kind of "watermark" (time-based or a git-style commit hash), and cascade throughout the system, so that the UI gets some kind of identifier that can be used to query the system to ask if it's reached that watermark yet. It's tricky.

Anyone else done something like this?

I think whether reassembling more coarse grained changes is realistic depends quite heavily on the schema and the existing usage. There's enough schemas, even pretty normalized ones, where that's pretty simple for the frequently changing objects. In others it's pretty complicated and an approach like you describe, where the stream is just used to trigger a request to retrieve a resource, is more realistic. Especially if a somewhat efficient API to get data from the database already exists.

In other cases infrastructure for transforming data from the normalized to denormalized data already exists. E.g. as part of the process of transforming the OLTP data into a data warehouse. In those cases it's not necessarily hard to add different transformations.

Guidelines | FAQ | Support | API | Security | Lists | Bookmarklet | Legal | Apply to YC | Contact