I put several weeks in to moving our machine learning pipeline over to Spark only to find I kept hitting a race condition in their scheduler.
After doing a bit of searching, it seems this is actually a known issue https://issues.apache.org/jira/browse/SPARK-4454 and there's been a fix on their github for a while: https://github.com/apache/spark/pull/3345 and yet in that time two releases have swung by bringing a tonne of features.
I ended up having to drop Spark ultimately because I wasn't confident about putting it in to production (the random OOMs and NPEs during development weren't great either). Does anyone have any positive experiences?
Lemme tell you though... as someone that has use Hadoop for 5+ years... not waiting 5-10 minutes every time you run new code is worth the trouble. Despite more problems owing to immaturity, or just Spark 'doing less for you' in terms of data validation than other tools like Pig/Hive, if you can get your stuff running on Spark... development is joyous. You just don't have to wait very long during development anymore.
I feel like 5 years of my life were delayed 10 minutes. That did terrible things to my coding that I'm just starting to get over. With Spark I am 10x as productive, and I am limited by my thinking, not the tools.
PySpark in particular is really great.
And I love the fact you can press Tab and get autocompletion of methods.
I appreciate the work that's gone in to Spark and it's clearly well designed. Developing with Spark after coming from a Hadoop background was a very refreshing experience.
Our experience with Spark Streaming, on the other hand, has been mixed. Our streaming app runs stably most of the time (up to 4 days in some cases), but we still see the occasional failure, sometimes with no exception or stack trace indicating what failed.
Our goal is to have a 24/7 streaming service, and Spark has gotten us close to that. There are just a couple of unexplained errors standing in our way.
We decided to rewrite some of those jobs from Pig to PySpark, and though there was a little bit of a learning curve and some sharp edges, the development experience is so much better than Pig that my team is generally happy with the switch.
It really is Hadoop 2.0.
I don't think I could bring myself to ever write another Hadoop job.
First it makes it easy to do the feature extraction and model fitting in the same pipeline, hence make it possible to cross-validate the impact of the hyper-parameters of the feature extraction part. Feature extraction generally starts from a collection of large, raw datasets that needs to be filtered, joined and aggregated (for instance a log of user clicks, sessionized by user id over temporal windows, then geo-joined to GIS data via a geoip resolution of the IP address of the user agent). While the raw datasets of clicks and geographical databases might be too big to be processed efficiently on a single node, the resulting extracted features (e.g. user session statistics enriched with geo features) is typically much slower and could be processed on a single node to build a predictive model. However spark RDDs make it natural to trace the provenance hence trivial to rebuild downstream models when tweaking upstream operations used to extract the features. The native caching features of Spark make that kind of workflow very efficient with minimal boilerplate (e.g. no manual file versionning).
Second, while the underlying ML algorithm might not always benefit from parallelization in itself, there are meta-level modeling operations that are both CPU intensive and embarrassingly parallel and therefore can benefit greatly from a compute cluster such as Spark. The canonical case are cross validation and hyper-parameter tuning.
You can optimise any particular use case to perform better than Spark, but then you are going to incur the above costs for every project you create.
For some context - In our case, loading a reasonable set of data from HDFS can take upto 10-30 mins so keeping a cached copy of the most recent data with certain columns projected is important.
I guess this means DataFrames should be used all the time in the future, or will there still be a reason to use plain RDDs in the future?
You guys are doing great work !
x = data.table(a=sample(10,10e6,replace=TRUE),num=sample(100,10e6,replace=TRUE))
t1=proc.time(); x[,sum(num),by=a]; print(proc.time()-t1)
user system elapsed
0.209 0.032 0.245
I don't expect at small scale to beat R yet. There are a few low-hanging fruits for single node performance. For example, even for single node data, we incur a "shuffle" to do data exchange in aggregations. This is done to ensure both single node program and distributed program go through the same code path, to catch bugs. If we want to optimize more for single node performance, we can get the optimizer to remove the shuffle operation in the middle, and just run the aggregations. Then this toy example will probably be done in the 100ms range.
We will work on better integration in the future too.