How to stream PostgreSQL CDC to Kafka and use Propel to get an instant API (propeldata.com)
57 points by acossta 72 days ago | hide | past | favorite | 22 comments

All these tools built on Postgres CDC have tutorials that are based on a single table. But in real life, your data models are likely to be normalized across multiple tables with joins. Or you might have an aggregate object that contains multiple child objects from another table.

The hard part of using streaming data as the data source is that you don't have a mechanism to go back and get the data that you missed. Either for some join across normalised tables or to fetch some child objects you've not seen yet.

I don't see how any reasonably complicated data model would work here.

One way for addressing this concern is (stateful) stream processing, for instance using Kafka Streams of Apache Flink: using a state store, you ingest the CDC streams for all the tables and incrementally update joins between them whenever there's an event coming in for either input stream. Touched on this recently in this talk about streaming data contracts: https://speakerdeck.com/gunnarmorling/data-contracts-in-prac....

An alternative is feeding only one CDC stream into the stream processor, typically for the root of your aggregate structure, and then re-select the entire aggregate by running a join query against the source database (discussed this here: https://www.decodable.co/blog/taxonomy-of-data-change-events).

Both approaches have there pros and cons, e.g. in terms of required state size, (transactional) consistency, guarantees of capturing all intermediary states, etc.

Of course it's possible to solve. But I think the solutions (and the cons of those solutions) are often pretty arduous or unreasonable.

Take the example of joining multiple streams, you have to have all your data outside your database. You have to stream every table you want data from. And worst of all, you don't have any transactional consistency across those joins, it's possible to join when you've only consumed one half of the streaming data (e.g. one of two topics).

The point is, everything seems so simple in the example but these tools often don't scale (simply) to multiple tables in the source db.

This is exactly right! Most streaming solutions out there overly simplify real use-cases where you have multiple upstream tables and need strongly consistent joins in the streamed data, such that transactional guarantees are propagated downstream. It's very hard to achieve this with classic CDC + Kafka style systems.

We provide these guarantees in the product I work on and one of our co-founders talks a bit about the benefits here: https://materialize.com/blog/operational-consistency/ .

It's often something that folks overlook when choosing a streaming system and then get bitten when they realize they can't easily join across the tables ingested from their upstream db and get correct results.

Debezium gives you all the information you need for establishing those transactional guarantees (transaction metadata topic), so you can implement a buffering logic for emitting join results only when all events originating from the same transaction have been processed.

The key ingredient is leveled reads across topics.

A task which is transforming across multiple joined topics, or is materializing multiple topics to an external system, needs to read across those topics in a coordinated order so that _doesn't_ happen.

Minimally by using wall-clock publication time of the events so that "A before B" relationships are preserved, and ideally using transaction metadata captured from the source system(s) so that transaction boundaries propagate through the data flow.

Essentially, you have to do a streaming data shuffle where you're holding back some records to let other records catch up. We've implemented this in our own platform [1] (I'm co-founder).

You can also attenuate this further by deliberately holding back data from certain streams. This is handy when stream processing because you sometimes want to react when an event _doesn't_ happen. [2] We have an example that generates events when a bike-share bike hasn't moved from it's last parked station in 48 hours, for example, indicating it's probably broke [3]. It's accomplished by statefully joining a stream of rides, with that same stream but delayed 48 hours.

[1] https://www.estuary.dev [2] https://docs.estuary.dev/concepts/derivations/#read-delay [3] https://github.com/estuary/flow/blob/master/examples/citi-bi...

Yeah the naive approach would to join on key. Otherwise can join on a transaction id and key. However it's definitely complicated to get right.

I have recently been burned by Kafka Streams specifically: it has been causing the excessive rebalancing in the Kafka cluster due to the use of the incremental cooperative rebalancing protocol, and not allowing the implementation to change the rebalancing protocol if Kafka Streams are used. The excessive rebalancing has resulted in severe processing slowdowns.

The problem has been resolved by falling back to plain Kafka Consumer/Producer API's and selecting the traditional eager protocol with a round-robin partition assignment strategy, which has reduced the rebalancing to near zero.

I am starting to think that for stateful stream processing, ksqlDB based stream aggregation is the way to go for simple to medium complexity stream aggregation, and for complex scenarios aggregating the data either in the source or as close to the source as possible and not using Kafka Streams. In my case, the complication is that the source is not a database / data store but a bespoke in-house solution emitting CDC style events that are ingested into the Kafka streaming app.

We built a streaming CDC pipeline in MSSQL and have aggregated joins to a flat table on the analytics database side. It's like 80 lines a table with half of that being a bulk load from the beginning of time in case we ever need to add a new columns or change schemas. We built it as a command line argument though I'm sure others do something similar.

80 lines could be trimmed down if we made some base class abstraction but it shipped and we don't touch it and moved on to other features.

One way to address that is by picking an analytical DB that works well with joins without needing to denormalize data. StarRocks claims [1] to do better than ClickHouse in that regard.

[1] https://www.starrocks.io/blog/benchmark-test

Convex provides transactionally consistent subscriptions on arbitrary data joins/fetches out-of-the-box, simplifying complex data management challenges. Their recent blog post at https://stack.convex.dev/how-convex-works explains how this works under the hood.

Solving the problem of having a transactionally consistent subscription on arbitrary data joins / fetches is hard, but it's so nice to have. If you haven't checked it out yet, Convex provides this out of the box. A blog post on how it works just landed today: https://stack.convex.dev/how-convex-works

Please don't do this to yourself.

This is how a lot of the higher performance pg event streaming libs work. Supabase realtime[0] is a good example.

[0]: https://github.com/supabase/realtime

I'm trying to understand the benefit of tackling this level of functionality at the WAL layer instead of the LISTEN/NOTIFY or TRIGGER ON INSERT/UPDATE/DELETE layers

NOTIFY drops events on the floor if no session is currently LISTEN-ing (due to a consumer restart, say).

You could use a trigger to write events into a log table from which they're reliably consumed and then removed, but you've now significantly increased your DB's disk activity and overhead.

Aside from performance -- which is a _big_ reason -- using CDC with the DB's WAL also lets you do capture without needing to muck about in your production DB's schema. Keep in mind that the teams producing data, vs the teams consuming data, are often different so this coordination is hard in a larger org!

You can listen to a WAL replication stream as a dedicated node without affecting performance of the original insert/update/deletes on the writer. That's not the case with either triggers or listen/notify.

Sounds complicated

Well written post! However I'd like note that, we commonly hear from users that Debezium for Postgres CDC is hard to setup, manage and less flexible around data format (only json and avro). It requires at least a few months of effort to put to production.

We at PeerDB (https://github.com/PeerDB-io/peerdb) are trying to solve this problem. We already have a connector out for Azure Event Hubs https://blog.peerdb.io/enterprise-grade-replication-from-pos... This blog captures how we are building a more production/enterprise ready solution to Postgres CDC. Connectors for Kafka, Redpanda, PubSub are already merged to main and we plan to GA them soon!

Debezium supports all kinds of data formats. For instance, ProtoBuf is often used these days, but it's easily customizable/configurable to have other formats, too.

> "At least a few months"

Can you back this up? This doesn't match my experience from working with users in the Debezium community at all. Don't get me wrong, Debezium certainly has a learning curve, but that's nearwhere realistic.

I think you're doing interesting stuff at PeerDB, but it would be nice if you could do without this kind of unfounded anti-Debezium FUD.

Thanks for the feedback here! My apologies if this came across as FUD. So let me clarify, Debezium is proven. Many large companies use Debezium for production/enterprise grade CDC. So it is indeed a great piece of software!!

However, higher Capex and Opex costs to put Debezium to prod is one of the common problem we've heard from Postgres users.

This is indeed related to the learning curve: One issue we've heard is the emphasis on command-line interface (vs UI) which provides a bunch of options for configurability but makes it complex for a first time (average) user to work with. There is Debezium UI, but that is not the default recommendation (and seems to be still in incubation). At PeerDB, we are trying to address this by working on a simple (yet Advanced) UI and Postgres-compatible SQL layer (for more complex pipelines) which we believe is more intuitive than bash scripts.

Disclaimer: PeerDB is still in its early stages, and we have yet to support the full range of configurability options that Debezium offers. We may encounter same challenges as Debezium related to learning curve. However, we plan keep "Usability" as top priority while building software. So far, with a few production customers, the direction we are taking seems positive.

In regards to formats, we have a few users who wanted Flatbuffers/MsgPack (binary JSON) formats and it wasn't trivial to setup with Debezium. These are users who haven't worked with Debezium before but after few days of effort, felt that it wasn't very easy.

Thanks again for the feedback here! And apologies if my comment came across negative. Thanks for probing me to clarify what I meant. :)

