Hacker News new | past | comments | ask | show | jobs | submit login
Show HN: Riko – A Python stream processing engine modeled after Yahoo! Pipes (github.com/nerevu)
283 points by reubano on July 21, 2016 | hide | past | favorite | 67 comments



`riko` is pure python stream processing library for analyzing and processing streams of structured data. It's modeled after Yahoo! Pipes [1] and was originally a fork of pipe2py [2]. It has both synchronous and asynchronous (via twisted) APIs, and supports parallel execution (via multiprocessing).

Out of the box, `riko` can read csv/xml/json/html files; create text and data based flows via modular pipes; parse and extract RSS/ATOM feeds; and bunch of other neat things. You can think of `riko` as a poor man's Spark/Storm... stream processing made easy!

Feedback welcome so let me know what you think!

Resources: FAQ [3], cookbook [4], and ipython notebook [5]

Quickie Demo:

    >>> from riko.modules import fetch
    >>> 
    >>> stream = fetch.pipe(conf={'url': 'https://news.ycombinator.com/rss'})
    >>> item = next(stream)
    >>> item['title'], item['link']
    ('Master Plan, Part Deux', 'https://www.tesla.com/blog/master-plan-part-deux')
[1] https://web.archive.org/web/20150930021241/http://pipes.yaho...

[2] https://github.com/ggaughan/pipe2py/

[3] https://github.com/nerevu/riko/blob/master/docs/FAQ.rst

[4] https://github.com/nerevu/riko/blob/master/docs/COOKBOOK.rst

[5] http://nbviewer.jupyter.org/github/nerevu/riko/blob/master/e...


Nice project. I wrote something similar in C# long time ago [1]. Mostly to monitor job feeds and craigslist [2] :-) It supports RSS and Atom, async, various filters, deduplication, etc

Yahoo Pipes was a nice project, but as its popularity grew, it started getting blocked more and more. It was also hard to build and maintain pipelines with more than a few steps.

[1] https://github.com/olviko/RssPercolator

[2] https://github.com/olviko/RssPercolator/blob/master/RssPerco...


Cool! I actually starred your project last year. Never really got around to looking under the hood though. How do you handle the "multiple destinations" part? In python you can do it with a coroutines [1, 2] implementation (push based). I avoided that, since it coroutines have their own form of callback hell, and decided to implement a generator api (pull based) [3, 4].

But since generators can only be "pulled" into one destination, you have to copy a stream (subsequently converting it into a list) if you want more than one destination [5]. This works fine if the data can fit in memory, but if it can't then you're out of luck!

[1] http://www.dabeaz.com/coroutines/copipe.py

[2] http://www.dabeaz.com/coroutines/

[3] http://www.dabeaz.com/generators/retuple.py

[4] http://www.dabeaz.com/generators

[5] https://github.com/nerevu/riko/blob/master/riko/modules/spli...


Can’t really remember, sorry, that code was built for processing RSS feeds and data size was never an issue. I will take a look when get some free time…

But, I see what you mean. I had to deal with similar issues in commercial projects and the "pull" model (generators in Python ~ "yield return" in C#) almost never a good idea, especially when you have to have concurrent consumers. While callbacks are hard to combine, in C# it can be nicely abstracted with “async/await”, not sure how it is handled in Python, I stopped using it around 2.5


Python 3.5 introduced the async/await syntax, I don't do C# but at a glance it's the same.

I've been working on a similar project and I've also found the push model easier.


Care to share your code? I'd like to see a proper push implementation.


Another benefit of the push model is that back pressure becomes much easier to implement.


Any code to share? I've yet to see how to properly do push.


Maybe I'm not seeing something but wouldn't this be simpler with http://reactivex.io/ (Rx.NET specifically) ? Just implement IObservable that pushes RSS events and then use Rx Filter/Select/Merge/GroupBy to filter/join/synchronize/whatever


Maybe. Don't think any of this existed when I need it...

1. The code is tiny with 90% of it dealing with RSS parsing and filtering. Using RX.NET wouldn't really simplify anything.

2. I wanted a library that I can integrate into my apps and run locally to avoid throttling, robots.txt and other BS Yahoo Pipes was suffering from.

I am also not a huge fan of RX... to put it mildly


I think I've used Rx way back when in 2009 (there was no TPL in silverlight so you had to use callbacks and events for continuations which was insane, so I used Rx) so it's been around.

Mind sharing why you don't like it ?

I personally like how it allows me to express complex high level operations cleanly. For example - I have a observable configuration variable that can come from different sources and the source change dynamically. I need to listen to latest source until a new one becomes active - in Rx I only need to push the new source trough IObservable<IObservable<ConfigurationValue>> and then use http://reactivex.io/documentation/operators/switch.html which returns IObservable<ConfigurationValue> which will push values from the latest source - Rx will handle unsubscribing from previous active source, synchronizing state and making sure everything is thread safe. And there are a bunch of operators like this that would be tedious and hard implement correctly with all the edge cases in a thread safe way - and here they are abstracted in to high level operators.


This is an implementation of "Flow Based Programming", right? Its a programming paradigm invented before its time IMHO; perfect for a world of streaming data.


> This is an implementation of "Flow Based Programming", right?

pretty much.... just without a GUI. My inspiration was method chaining [1] and the first implementation was this [2].

[1] http://martinfowler.com/articles/collection-pipeline/

[2] http://stackoverflow.com/questions/12172934/method-chaining-...


I am currently working on an alternative approach which is based on a column-oriented paradigm for in-stream analytics as opposed to flow-orientation. It is an adaptation of DataCommandr (http://conceptoriented.com) which is a column-oriented approach to data processing. Unfortunately, StreamCommandr is not yet available but it would be interesting to compare advanatages and drawbacks of these two paradigms.


I didn't see any code examples so it's a bit difficult to figure out what's going on. Are there any open source column-oriented programming implementations?


Actually, it is a novel approach so I am not aware of anything similar. Initially, I implemented it for data wrangling (various potentially complex transformations: https://en.wikipedia.org/wiki/Data_wrangling) and it can be found here: https://bitbucket.org/conceptoriented/dce-java. The main point is that instead of defining a flow (graph) of tables we define a flow (graph) of columns. It can be used where Apache Spark is used.

StreamCommandr essentially relies on the same principles and changes only external API and some internal processing. The idea is that any data table is like a stream of records so that we add new records and delete outdated records. Simultaniously, we evaluate other columns and tables by performaing potentially complex computations which are difficult to do in a record flow.


Not sure flow based programming necessitates a GUI any more than object oriented, functional or procedural programming does...


> This is an implementation of "Flow Based Programming", right?

A more modest implementation would be Unix pipes, I think, where the data flows untyped.


I was a heavy user of pipes and I'm now a heavy user of python. I have built my own dodgy simple replacement for some of the things I used to rely on pipes for. I'm very eager to see what you've got here, at first glance it seems like an excellent fit for my needs.

Thanks!


Please let me know what you think. I worked pretty hard on the readme so let me know if anything is confusing and/or doesn't make sense.


This readme could be a guide for many projects!


Can you consider Dask integration? http://distributed.readthedocs.io/en/latest/queues.html https://github.com/dask/dask

It can handle parallel and distributed parts for you.

https://github.com/dask/dask


I just read about dask earlier today, very neat project! riko already handles parallel processing [1] but adding distributed processing sounds tempting. TBH though, distribution isn't high on the priority list. But I'll be happy to accept a PR if you are so inclined :)

[1] https://github.com/nerevu/riko#parallel-processing


If you're looking for a stream processing engine more close to Storm, etc. but also simple, check out Motorway: https://github.com/plecto/motorway :-)


Interesting project. I hadn't come across this one yet. One difference that riko has is it's based around functions whereas this library (and practically every stream processing lib I've come across) is based around classes.

I personally prefer the functional approach much better. And if you compare the word count examples on the respective readmes [1, 2], you will see riko is much more succinct. But I suppose the verbosity of the other libraries come with benefits like scaling across a cluster of servers.

[1] https://github.com/plecto/motorway#word-count-example

[2] https://github.com/nerevu/riko#word-count


I just started writing a functional stream processing library in Python for some of the same reasons.

We use somewhat different concepts. I tend to think of streams as infinite, so it didn't occur to me to include something like a reverse pipe operator.

I'm a bit surprised, why is filter an operator rather than a processor? I would think filters usually apply per-item, not to a whole stream?

I havn't worked on it very much but I'm heading towards push-based, using 0MQ for distribution/parallel processing, and using asyncio, mostly because it plays nicely with 0MQ.


> We use somewhat different concepts. I tend to think of streams as infinite, so it didn't occur to me to include something like a reverse pipe operator.

We are in agreement. reverse has a notice that it isn't lazy [1]. I prefer to include pipes that aren't lazy since it can be helpful in some cases (plus the goal is to include all pipes originally in Yahoo! Pipes). The vast majority of pipes work just fine on infinite streams [2].

> I'm a bit surprised, why is filter an operator rather than a processor? I would think filters usually apply per-item, not to a whole stream?

Just an implementation detail [3]. I agree it would be better if it were a processor since it could be parallelized. PRs welcome :).

> I haven't worked on it very much but I'm heading towards push-based, using 0MQ for distribution/parallel processing, and using asyncio, mostly because it plays nicely with 0MQ.

See my previous comments related to this [4, 5].

[1] https://github.com/nerevu/riko/blob/master/riko/modules/reve...

[2] Assuming you're not using the async or parallel mode

[3] https://github.com/nerevu/riko/blob/master/riko/modules/filt...

[4] https://news.ycombinator.com/item?id=12137591

[5] https://news.ycombinator.com/item?id=12137787


I am still a user of Plagger [1], but development halted quite some time ago. Maybe this could be a good replacement.

[1] https://github.com/miyagawa/plagger


Hadn't heard of this one before. There isn't a readme and the site seems to have been taken over. But from what I can tell based on the examples, it has a yaml based scheduler and does some pretty nifty things like IRC notifications.

riko doesn't have a scheduler (although the original pipe2py has a json based one). However, I do plan to integrate with Airflow/Oozie/Luigi [1-3] in the future to make it easier to design workflows.

The notification system reminds me of Huggin [4]. Since riko is twisted based, it should be fairly straightforward to implement something similar for IRC/IMAP/FTP/etc.

[1] https://github.com/apache/incubator-airflow

[2] http://oozie.apache.org/

[3] https://github.com/spotify/luigi

[4] https://github.com/cantino/huginn


This is really interesting. Have you looked at Apache Beam? What I think is interesting about Beam -in this specific context- is that it has a standalone runner (java), that similarly as riko let you write pipelines without worrying about a complex setup. But then, if you need to scale your computation, Beam is runner-independent and you can take the same code and run it at scale on a cluster, wether it's spark, flink, or google cloud. You can read more here [1].

As for riko more specifically, Beam will have soon a python sdk, but I'm unsure if there will be a python standalone runner. Maybe this is something to look into...

[1] https://www.oreilly.com/ideas/future-proof-and-scale-proof-y...


> This is really interesting. Have you looked at Apache Beam?

Just gave it a look. Took a while to find some examples with code, but once I did it made a bit more sense.

> Beam is runner-independent and you can take the same code and run it at scale on a cluster, wether it's spark, flink, or google cloud.

I thought that was pretty cool.

> As for riko more specifically, Beam will have soon a python sdk, but I'm unsure if there will be a python standalone runner. Maybe this is something to look into...

A python standalone runner would be very useful. Otherwise I'm hesitant to go much further since my goal is to have a pure python solution for working with streaming data. Most libraries require installing java and that is what I'd like to avoid.


Python sdk is work in progress - there's currently a branch: https://github.com/apache/incubator-beam/tree/python-sdk


if someone can spin up a usable gui, charge enough to make a living without compromising on performance, promise some longevity and a way to export of my stuff I would probably pay for that, I loved pipes, the GUI was a big deal for me.


Have you investigated any of the existing GUIs? [1-3] I'd love to hear your thoughts on their pros/cons. I do plan to integrate a nice GUI framework if I can find one.

[1] https://azkaban.github.io/ [2] https://developers.google.com/blockly/ [3] http://nodered.org/


Node-RED is pretty great. I think a wiring interface is a better choice than a block-level thing like blockly.


True, Node-RED is much more aligned to the original Yahoo! Pipes interface. I kinda like the scratch/blockly interface since you can make it clear which components can go where. Plus it's reminiscent of physical Lego blocks.

Which interface do you think is more newbie friendly? My gut says blocks (maybe something a bit more simple/refined than blocky) are easier to grok, while wires allow for designing more complex workflows.


I don't feel like blocks really maps well to the kinds of tasks i'd do in node-red and yahoo pipes, and presumably riko (I didn't dive too deep here yet though). a wiring interface better reflects the idea of inputs, filters, and outputs, and wiring them up in flexible ways. blocks seems a little rigid and more variable/iteration/function-based.

maybe figure out a few common workflows that people would make in riko or node-red, and mock up how they'd work/look in blocks vs. wiring/pipes.


> maybe figure out a few common workflows that people would make in riko or node-red, and mock up how they'd work/look in blocks vs. wiring/pipes.

Good idea, what are your workflows?


I think the wiring is a good way to show the connections, white space is needed to keep things readable so it allows for gaps and logical groupings, wires, strings, I think the analogy is fairly digestible to someone new to this. Thanks for everyone's input, I'll be taking a look.


Apache Nifi looks promising. https://nifi.apache.org


Just curious:

What kind of a demand is there for a pipes-kind of product or even a customizable/searchable rss/feed integrator?

How much would a typical user be willing to pay for it?


Sweet. I put together something similar for NodeJS which is now called 'turtle' (because it's turtles all the way down...). There's a bit of a focus on AWS Lambda & other FaaS solutions as a means of building Lambda architectures, but it can be used by itself.

https://github.com/iopipe/turtle



Looks interesting. What kind of applications do people use this for?


Mashups [1] and Extract Transform Load (ETL) [2] are two big use cases. I developed a freelance project aggregator using an earlier version of riko [3].

[1] http://mashable.com/2009/10/08/top-mashups/#0XwtqVCCXPq2

[2] https://www.quora.com/How-do-ETL-tools-work

[3] http://app.kazeeki.com/


Thanks, appreciate your comment.


While I didn't use yahoo pipes too often, I loved it. Having this as a python library (I'm trying to get deeper into python), is great! Kudos and good luck!


Also in this space (and worth looking at for inspiration, especially for other potential sources and sinks of data) - Apache Camel [1].

[1]: http://camel.apache.org/


I don't know if it's bc of the language (java) or something else, but I've never been able to grok apache data projects. I theoretically know what they do, but there's no way I can understand the code, e.g. [1].

[1] http://camel.apache.org/etl-example.html


The code you see in the beginnig is actually code for configuring a "route" from a source of data to a destination. That was for me the key to understanding it. Once you do, you will find the rest of the documentation easier to follow. The earlier versions of Camel put the config in XML filed, but most prefer the DSL approach


Is there anything like this available that's based on node.js with a decent GUI?



This is amazing, thanks for sharing!



Also might want to check out http://concord.io, it's a bit more work to set up, but it's much faster than most stream processing systems


How does concord differ from the others? spark/storm/flink/etc...? Aside from being written in C that is.


eng at concord here.

Really cool API, you should port this to concord! =)

i'd say major diff is dynamic topology. So during the pipeline execution you can add/remove workers for any stage.

Also each stage/operator can be written in any programming language.

Storm/Flink/SparkStreaming/etc... all have much higher level API's. We built the execution engine first, these great things (DSL, etc) should come soon. For example this API would be easy to support to execute on top (the pipe abstraction that is)

Here is an example of a DSL we prototyped in a couple hours.



Pretty neat! I'm guessing that concord isn't limited to just map/reduce... correct? I think building in integrations to other systems in the stream processing ecosystem is key. First on the list is to integrate with popular workflow schedulers so that you can design topologies which riko would then parse.

Next up I think would be supporting custom sources/sinks such as Twitter, HDFS, RDMS, etc. What exactly would be involved in "porting" riko to concord and what would the advantages be for doing so?


Interesting, will keep an eye out as you mature it, looks like an awesome DSL for us to fill the runtime.

Advantages are:

1. mesos integration, with that comes containerization support, multi tenancy, QoS, proper pipeline supervision, etc 2. Scheduling of pipelines. i.e.: Schedule them on 100 computers. 3. the outputs of your DAG could be consumed by other systems immediately and even written in different programming languages. So your python DSL could be the source to the Scala DSL at some point. so language interop 4. Available KV storage 5. Tracing (Zipkin) - ala Google Dapper. 6. Fast networking - C++ backed runtime is 1 order of magnitude faster than the python one.

What would be involved, is not much from what I can tell.

Each concord 'operator' is like a networked function.

so given a DAG, you could generate many operators internally, or literally write them to a file, i.e.: operator_one.py etc. The code generation or internal scheduling would be the glue that's needed.

if you ever become interested, ping me! would love to collab alex@concord.io


many differences from spark/storm/flink, but the most notable difference of concord is its dynamic topology model - deployment, scaling, and changes to your topologies all can be done in runtime, w/o restart of the full topology, which is required for spark/storm/flink.


> ...all can be done in runtime, w/o restart of the full topology...

That's pretty impressive! So something like react/elm hot reloading?


Yeah I guess that's one way to put it (little different but kindof).


This is absolutely beautiful. Love the fact that it's using RSS for this.


Thank you. Apparently RSS never got the memo that it "died" ;).


Looks nice. Are there any plans for twitter support?


Eventually. It would essentially be a "source" pipe. But ultimately, I want to build a plugin system so that end users can create/share their own pipes. I also plan to add pipes that let you add streams to a database.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: