Hacker News new | comments | ask | show | jobs | submit login
KSQL: Open Source Streaming SQL for Apache Kafka (confluent.io)
278 points by uptown on Aug 28, 2017 | hide | past | web | favorite | 81 comments

I've had the question for a while so I'll ask it here, maybe someone can help me.

Suppose you modeled your domain with events and your stack is build on top of it. As stuff happens in your application, events are generated and appended to the stream. The stream is consumed by any number of consumers and awesome stuff is produced with it. The stream is persisted and you have all events starting from day 1.

Over time, things have changed and you have evolved your events to include some fields and deprecate others. You could do this without any downtime whatsoever by changing your events in a way that is backward compatible way.

What is the good approach to what I'd call a `replay`?

When you want to replay all events, the version of your apps that will consume the events may not know about the fields that were in the event for day one.

As always in these types of the scenarios, the answer is: it depends. It depends on the amount of data you have. It depends upon how big the diversion from the original schema is. Etcetera.

My personal philosophy is to always leave event data at rest alone: data is immutable, you don't convert it, and you treat it like a historical artifact. You version each event, but never convert it into a new version in the actual event store. Any version upgrades that should be applied are done when the event is read; this requires automated procedures to convery any event version N to another version N + 1, but having these kind of procedures in place is good practice anyway. Some might argue that doing this every time an event is read is a waste of CPU cycles, but in my experience this far outweights possible downsides of losing the actual event stored at that time in the past, and this type of data is accessed far less frequently than new event data.

I suppose you can always trade those CPU cycles off against storage and cache the N+1 version (in a separate Kafka topic or elsewhere), so now reading the latest-version data is fast, yet you still retain the original data intact, at the expense of more storage. This does complicate the storage though, as you now have multiple days a sources, but nothing that can't be solved.

Bingo. If you equate this technique to a DB migration, you could have "up" and "down" directions for the translations from version N <=> N+1.

Then if you have 90% confidence you'll only ever need to replay the upgraded stream, you can upgrade it and destroy the previous version.

If at some point (the remaining 10%) you need to rescue the old stream, you can run the "down" direction and rehydrate the old version of the stream.

It sounds good in theory. In practice, I haven't heard much around running backwards migrations on a data warehouse / massive collection of events but I'm sure some out there already do it.

I suppose one needs to take care that migrations are never lossy so that the full information for upgrading or downgrading a version is available.

Yeah, that's the challenge. For instance, how do you handle when a column was one data type but then down the road was changed to another type when the two aren't cross compatible or could potentially break?

You could retain this info in a meta field of flexible type. For a DB, it could a JSON type. For messages, it could be an extra _meta field on the message that the systems themselves ignore.

> this requires automated procedures to convery any event version N to another version N + 1, but having these kind of procedures in place is good practice anyway

Note that this "good practice" already has a name, it is usually called "migrations".

Migrations may be as simple as SQL Update / ALTER TABLE statements. But they may also be transformation of JSON/XML/... structures, or any other complex calculation.

It may not be the best term, as "migrations" usually imply that the result is written back to the data store. But apart from that, I don't see any problem using this term here.

Ok this makes sense.

It matches what some others have been saying as well. Thanks

There's a whole world out there about this kind of stuff. Take a look at CQRS and some of the posts by Greg Young; they're highly informative and one of the first people to really capture this way of dealing with data properly.

I wonder what happened to Greg Young. I think he had a book in the pipeline (which I was looking forward to), but as far as I can glean from social media, some burnout related stuff happened.

Here's a pretty good, even if a bit too verbose, explanation of various issues and solutions related to event versioning: https://leanpub.com/esversioning/read.

The text is written by Greg Young - the lead on the EventStore [1] project.

[1] https://geteventstore.com/

thank you

I encountered that problem. The ad hoc fix, was to have a version field in each event and functions that translate the old event into new event(s). The code that processes the events only processes events of the current version. If your old events had been denormalized this might result into repetition of events when splitted.

To add to that, you can treat it like you would schema migration on databases: implement v1 to v2, v2 to v3, etc... and replay the migrations in order to migrate from whatever version of the event is to the latest version. This allows keeping event migration code as immutable as the event versions it migrates between.

Ok, thanks for the pointer

I've often wondered the same.

In data warehousing, particularly the Kimball methodology, if descriptive attributes are missing from dimensions, for example, it is common to represent them using some standard value, like, "Not Applicable" or "Unknown" for string values. For integers, one might use -1 or similar. For dates it might be a specially chosen token date which means "Unknown Date" or "Missing Date".

It doesn't solve the problem of truly unknown missing information, but it at least gives a standard practice for labeling it consistently. Think of trying to do analytics on something where the value is unknown?? Not too easy, but at least it is all in one bucket.

Certainly, if past values can be derived, even if they were not created at the time the data was originally created, that is one way of "deriving" the past when it was missing. But, otherwise, I don't think there is any other way to make up for the lack of data/information.

The best way I've seen it done, is to version all of your schemas and have a database that signals all the transformations needed to be done for any given schema version. That way, when your reading a particular event, you can query for all the operations needed to be done on such event and perform them.

What you are referring to is called a "temporal database". Your specific example is called a "bitemporal database".



If the fields are changing then you effectively have DDL and migrations in your code already... so decouple them and version the schema officially. Then record these schema changes as events in the same event stream.

Build a view on against these schema change events as a table of schema version by timestamp to allow for parsing any arbitrary event.

Don't persist the stream. The problem gets a lot easier if you stop thinking of a message bus as a data store.

How do you do a replay if you don't keep the event somewhere? I did not mean that messages were to be stored in the bus?

Recently asked on the Kafka users mailing list https://lists.apache.org/thread.html/82692004eb2292e1240c339...

Thanks for sharing, it makes me realize that our messages are not independent.

I highly recommend you look into gRPC.

Building apps using event sourcing, CQRS and microservices can easily become hell if the data models are not thought through.

I thought RPC and CQRS are diametrically opposite patterns. (Although you can use RPC in a CQRS context, but only as a transport/encapsulation layer (so the response says "request queued" or "error", but does not divulge domain specific information ("item created", "item not created")).

I'm also wondering: how one deal with changes to events when using KSQL?

Mind clarifying what you mean by changes to events?

If you create a STREAM or TABLE using KSQL, it makes sure that they are kept updated with every single event that arrives on the source Kafka topics.

That's what you'd expect in a true event-at-a-time Streaming SQL engine, which is what KSQL is.

Suppose you build a STREAM or TABLE from a topic and assume a field in the event is `id`. Later on, you introduce an update to this event where where your replace `id` by `user_id`, how is KSQL reacting?

Apache Flink is also a good alternative, and works very well. We have used it in production for a while for generating live reports. I made simple example [1] and have a look at the docs if you are more interested [2]. Gonna definetely try Kafka's version, its version of stream processing [3] also interesting as well.

[1] https://medium.com/@mustafaakin/flink-streaming-sql-example-...

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/...

[3] http://docs.confluent.io/current/streams/index.html

I'm one of the authors of Kafka. I've outlined some differences between Flink's support for streaming SQL and KSQL in this Twitter thread - https://twitter.com/juliusvolz/status/902283513382051840

Here's a summary: - KSQL has a completely Interactive SQL interface, so you don't have to switch between DSL code and SQL.

- KSQL upports local, distributed and embedded modes. Is tightly integrated with Kafka's Streams API and Kafka itself; doesn't reinvent the wheel. So is simple to use and deploy.

- KSQL doesn't have external dependencies, for orchestration, deployment etc.

- KSQL has native support for Kafka's exactly once processing semantics, supports and stream-table joins.

Disclaimer: I am one of the Flink committers.

While Flink has in fact no direct SQL entry point right now (and many users simply wrap the API entry points themselves to form a SQL entry point), the other statements are actually not quite right.

  - Flink as a whole (and SQL sits just on the DataStream API) works local, distributed and embedded as well.

  - Flink does not have any external dependencies, not even Kafka/ZooKeeper; it is self-contained. One can even just receive a data stream via a socket if that works for the use case.

  - Flink itself has always had exactly-once semantics, and works also exactly-once with Kafka.

@neha - where do you think kafka is going to evolve in the world of data processing.

I'm very bullish on kafka. Today we have Spark for batch data computation and have already switched some of our streaming stuff to Kafka.

Do you see yourselves entering into the batch processing space anytime ? Google has officially said that Flink is "compelling" because of its compatibility with the Beam model.

If I can step on thin ice... is it easier for Flink to commandeer Kafka or for Kafka to win over batch processing ?

What do you mean by "batch processing" Personally i find that term to be confusing.

I believe if Kafka can do streaming then it effectively can do batch as batch is a subset of streaming.

I really wish frameworks offering SQL would upfront say what level of SQL compatibility they have, is it SQL 2011, is it Postgres etc...

Anyways, if anyone's wondering, here's the Github page. [1]

Also from FAQ [2]:

Is KSQL fully compliant to ANSI SQL?

KSQL is a dialect inspired by ANSI SQL. It has some differences because it is geared at processing streaming data. For example, ANSI SQL has no notion of “windowing” for use cases such as performing aggregations on data grouped into 5-minute windows, which is a commonly required functionality in the streaming world.

[1] https://github.com/confluentinc/ksql/blob/0.1.x/docs/syntax-...

[2] https://github.com/confluentinc/ksql/blob/0.1.x/docs/faq.md#...

I'm assuming that most "SQL" implementations in distributed systems don't actually follow the SQL standard. They just offer a SQL like syntax to solve common problems. I could be wrong, but that's the impression I get working with tools like Cassandra/Spark.

ANSI SQL should be extended to standardize streaming syntax. There are certainly enough vendors that have it now... for example, sqlstream.com. It's too bad that SQL:2011 didn't get these features since it had a lot of temporal-related extensions.

Flink's SQL implementation actually follows ANSI SQL - which I think is very important.

There is a way of interpreting streams and tables to make ANSI SQL meaningful in the presence of streams, which we follow [1].

The big advantage is (besides not having to learn another syntax and retaining compatibility with SQL tools and dashboards) that this seamlessly handles the batch (bounded/static input) and streaming (unbounded/continuous) use cases with literally the same SQL statement.

[1] http://flink.apache.org/news/2017/04/04/dynamic-tables.html

Confluent doesn't understand the ANSI SQL standard. Support for aggregating over windows with standard syntax goes back to SQL:99, so it's been around for 18 years. Any company playing in the SQL space should make a point of understanding the standard.

There is a good explanation of ANSI SQL standard windowing at http://sqlstream.com/docs/conc_applicationdesign.html?zoom_h.... The document describes both tumbling and rolling windows as well as how they are achieved with standard syntax.

SQLstream could not agree with you more that an up-front explanation of SQL compatibility level is necessary to evaluate a product. SQLstream Blaze is SQL:2011 compliant.

Combine this with Debezium[1] and you get real-time SQL queries on your MySQL/PostgreSQL/MongoDB database!

RethinkDB, as far as I understand, does the same thing - their changefeed mechanism would consume the DB log and run the queries against the log.

[1]: http://debezium.io

Yes, exactly. We plan to add support for various Kafka connectors to KSQL so you can ingest/export streams from external systems, like database, and then do stream processing all using KSQL queries.

The purpose is to bring Streaming ETL and Stream Processing together to make the user's life easy.

That name is already taken by the San Carlos airport near Oracle: https://en.m.wikipedia.org/wiki/San_Carlos_Airport_(Californ...

Isn't it great that the name is entirely coincidental? Love that anecdote.

At Landoop we submitted our proposal to present our KSQL at the Kafka summit but we were rejected. Ah, if we only knew... :)

An independent program committee makes these calls and had to pick from 166 submissions. The program committee picks talks that are insightful and technically challenging.

I think a bit of clarification is required. KCQL, kafka connect query language was jointly developed with Landoop when Confluent commissioned DataMountaineer to write the JDBC sink. The reason we added it was to simplify the configuration but it also enabled us to filter and support various options of the many data sources/sinks we have connectors for. Confluent removed the kcql from the sink and reverted to a flume style configuration we were trying to avoid. It's good to see Confluent plans to support their Ksql in Connect, following DataMountaineers lead. We hope this is optional. We can look at migrating if it supports all the options we need that KCQL provides.

Regarding Landoops proposal. This was not about KCQL but SQL on Streams, also named KSQL which is integrated with their new product Kafka Lenses. We'll look at Confluents SQL and see which one to go forward with, maybe both but we have happy customers using our version.

But congratulations on your KSQL very nice! We (Landoop and DM staff) have proved many ex colleagues in the Investment bank world wrong about Kafka and this cements our decisions to use it. Thanks.

Hmm, that's "interesting". Can you elaborate a bit on how yours and theirs differ, if any?

There is a difference of course. While we are still learning about their implementation I think the statements below are true

1. We don't support just stream. You can throw a SQL at a kafka topic as easy as SELECT * FROM `topic` [WHERE ]

2. We support selecting or filter on the record metadata: offset/timestamp/partition (Haven't seen something similar in Confluent KSQL)

3. We integrate with Schema Registry for Avro. We hope to support Hortonworks schema registry soon as well as protobuf.

4. We allow for injecting fields in the Kafka Key part. For example: SELECT _offset as `_key.offset`, field2 * field3 - abs(field4.field5) as total FROM `magic-topic`

5. Just quickly looking at the Confluent KSQL "abs" function i see it accepts Double only. It doublt that everything is converted to Double before it hits the method and then converted back. (too short of a time to understand the whole implementation).

6. Filters: is related to point 2. We allow filter on message metadata. For example: SELECT * FROM topicA WHERE (a.d.e + b) /c = 100 and _offset > 2000 and partition in (2,6,9)

Also not sure if they have customers yet using it. We do.

There's a pretty big difference to a point that Landoop's KCQL (Kafka Connect Query Language) and Confluent's KSQL (Streaming SQL for Apache Kafka) are two different products.

- KSQL is a full-fledged Streaming SQL engine for all kinds of stream processing operations from windowed aggregations, stream-table joins, sessionization and much more. So it does more powerful stream processing on Kafka than what Landoop's product supports which is simple projections and filters.

- KSQL can do that because it supports streams and tables as first-class constructs and tightly integrates with Kafka's Streams API and the Kafka log itself. We are not aware of any other products that do that today, including Landoop's tool.

- We will add support for Kafka connectors so you can stream data from different systems into Kafka through KSQL. This will cover what Landoop intended with KCQL (Kafka Connect Query Language.

- Confluent works with several very large enterprises and many of the companies that have adopted Kafka. We worked with those customers to learn what would solve real business problems and used that feedback to build KSQL. So it . models on real-world customer feedback.

- We'd love to hear feedback. Here's the repository https://github.com/confluentinc/ksql and here's the Slack Channel slackpass.io/confluentcommunity - #ksql

Hope that helps!

I love the continuous queries over append-only datasets and it's great that Kafka added support for it.

It looks like there is one main contributor (https://github.com/confluentinc/ksql/graphs/contributors) though, it seems that the other contributors either wrote the documentation or helped for the packaging. Not a great sign considering how big this project is (there are competitors which only do this such Pipelinedb), hopefully you can create a team just for KSQL.

To add to what nehanarkhede said: I'd rather see it as a testament to the powerful building blocks of Kafka and its Streams API that you can actually implement a project such as KSQL with very few engineers.

Also, the commits in KSQL reflect only parts of the work -- it doesn't include design discussions, code reviews, etc.

Lastly, keep in mind that the git repository was cleaned (think: squashed commits) prior to publication, see the very first commit in the repository's timeline. So you don't see the prior work/commits of other Confluent engineers that went into KSQL.

Agreed. We recently had a few more members added to the team and are looking to grow it even more. Also note that KSQL is built on Kafka Streams and we have a team that works just on that as well.

I understand the underlying architecture is fundamentally different, but the end result seems kind of akin to some of the "complex event processing" (CEP) tools out there like Esper (ie. feeding incoming data into established queries instead of executing queries against at-rest data). Would this fit similar sorts of use cases / fit into the CEP market?

Yes KSQL is ideal for use cases similar to those that CEP was initially targeted for and more - from real-time anomaly detection, monitoring, analytics to application development and Streaming ETL. As you alluded to, the big difference is that KSQL is designed as a distributed Streaming SQL engine that can run at Kafka scale.

If you are interested in performing ultra low latency (<1ms) CEP via SQL, take a close look at SQLstream Blaze (http://www.sqlstream.com) and it's full implementation of Allen's Interval Algebra (https://en.wikipedia.org/wiki/Allen%27s_interval_algebra) via SQL temporal predicates: http://sqlstream.com/docs/sqlrf_planned_feature_temporal_pre.... This stuff runs at 1,000,000 events per second per core and can scale out in conjunction with Kafka across any number of servers.

This is the Kafka answer to Pipelinedb. I love it.

PipelineDB is great..and the guys are super helpful..

influxdb has something similar, and now so does AWS Kinesis (with their Kinesis Analytics product).

PipelineDB will become a standard, open-source PostgreSQL extension this year, enabling anybody using PostgreSQL to leverage continuous SQL queries in combination with all other PostgreSQL functionality.


> PipelineDB will become a standard, open-source PostgreSQL extension

What does this imply?

Are InfluxDB's continuous queries really like this? It's so much less of a streaming solution I hadn't realized it did this.

Doesn't PipelinDB limit you to postgres' scaling capabilities?

Not for streaming analytic workloads, because PipelineDB fundamentally adds continuous queries to PostgreSQL, so data is continuously distilled and aggregated as it arrives, before it is stored, which drastically reduces the amount of data stored in PipelineDB (or soon, in PostgreSQL via the extension refactor).

PipelineDB also offers a clustering extension for large workloads (see: http://enterprise.pipelinedb.com/docs/)

But in terms of ad hoc, exploratory analytics workloads, yes - the scaling limitations would be the same, since for ad hoc, exploratory analytics PipelineDB and PostgreSQL are the same. But with that said, the processed, aggregated data that gets stored is generally much smaller than large volumes of granular data, so there is much less data to comb through with PipelineDB.

Thanks for explaining!

this is awesome! will it work on Amazon RDS for postgresql ?

Does anyone knows details about the KSQL-engine that computes the queries? According to their git, there can be multiple KSQL-engines in a Client-Server configuration. Is the workload for one query distributed? Is the SQL translated into a program using the Kafka streaming API?

Yes, queries are translated into Kafka Streams API. In the client-server(cluster) mode each query will run on every instance of engine the same way kafka streams apps run on multiple instances.

Fun fact: KSQL is the code of the airport closest to Oracle HQ (San Carlos airport).

Came here to post this... do you know if the code is meant to pay homage to Oracle, or were they just trying to make it sound like San QarLos?

Edit: Oh, the wiki page explains that it predates Oracle. So probably the latter.

The many streaming SQL options are cool - but how do you test streaming SQL as it comes to represent a larger part of your application?

this link is broken in the article [step through the demo yourself](https://github.com/confluentinc/ksql/blob/0.1.x/docs/demo.md...)

ksql was the sql dialect understood by Kx systems k2 and k3 versions. It includes nice things like foreign key chasing.

Now I'm wondering if something like Mondrian could help stream MDX queries over this...

This seems like a painfully slow and complex way to do event processing.

What would be faster and less complex ways? Can you think of any advantages that KSQL has over them?

Seems like a somewhat similar idea to AWS Aurora.

Hmm. I don't see that at all. Aurora is a way to query data in S3. This is a way to query Kafka streams in real time.

I think you mean Athena :P

This seems to be basically similar to Spark, which lets you perform full SQL queries on Kafka streams.

Spark SQL is different from KSQL in the following ways:

- Spark SQL is not an interactive Streaming SQL interface. To do stream processing, you have to switch between writing code using Java/Scala/Python and SQL statements. KSQL, on the other hand, is a completely interactive Streaming SQL engine. You can do sophisticated stream processing operations interactively using SQL statements alone.

- KSQL is a true event-at-a-time Streaming SQL engine. Spark SQL is micro-batch.

What you said about Spark was not true. One can totally interactively query a Kafka stream using Spark, in SQL.

Guidelines | FAQ | Support | API | Security | Lists | Bookmarklet | Legal | Apply to YC | Contact