Relief: I'm not an idiot, and the problems in the shuffle were likely in Spark and not just me being a beginner user.
Frustration: I wanted to group about 100 billion x 200 byte records into 5 billion groups. This seems like exactly the problem Spark is designed for and is advertised for. I had great difficulty even getting example Spark SQL code (or my own RDD based code) working. I hear so many great things about Spark as a tool for big data, and also "your data isn't big". I considered 20TB on the "low end" of big data, but a seemingly popular and widely used big data tool can't shuffle it without numerous bug fixes and pain on the part of the user. Shuffling 90TB was worth a Facebook blog post! This makes me ask: To all the people using Spark for "big data", how painful is it and how much data are you handling? It appears the answer for >=20TB is "very painful", and for <=5TB I think you're generally in "handle on single node" territory.
I only use RDDs, so I put a lot of thought into the processing flow so that unnecessary data wasn't shuffled. If you're reducing PairRDDs, make sure that the data is evenly distributed. Also, I'm guessing you read the optimization docs, but a huge amount of network I/O can be reduced by choosing the right types and collections and optimizing serialization. And, of course group within partitions first, then within nodes, then across nodes. And, of course, go for fewer bigger servers with lots of network bandwidth.
There are a lot of tricks, unfortunately. And, since I don't know your experience level I won't bore you with things you probably already know.
Is it only applicable once the "cluster [is] sized appropriately so that all data stayed in memory" as you mention?
In Spark, the biggest performance gains are realized when all data stays in memory. But, the Spark memory architecture is kind of opaque. It's pretty well documented, and there are some good blogs out there, but there are a lot of factors in play.
For example, let's say we start with 1 TB of compressed data and take a guess that it will uncompress to 10 TB. So, we do a naive calculation and guess that we'll need 20 nodes, each with 500 GB of memory.
For our example, Spark will be managed by Yarn. Yarn is going to take a portion of each node's memory. I forget how much, but let's say 10%. Then, depending on how many executors we have per node, each will also have memory overhead. Again, I don't know exactly, but let's just say another 10%. Then, some memory will be dedicated to system processes, etc. Then there's the Spark driver. So, after all of that, let's say 25-30% of memory is used outside of the executor. So, that leaves us with 10 executors per node, each with ~35 GB of memory. For maximum performance, we have to ensure that all 10 TB fits across all executors. So, we may actually need 29 nodes, or a lot of data is going to spill to disk.
Then, let's say that our processing job doesn't evenly distribute data across partitions. Some partitions get 5x as much data as other partitions. Again, data spills to disk - or the VM runs out of heap. Or, maybe our 10% compression estimate is off by 5% - more spills to disk.
I ran a 'small' cluster of 24 nodes (300GB+ of RAM) and while I know I could have done the same thing with super optimized code on a single machine, not having to worry about each node's performance but instead thinking in pipelines is refreshing. My goal was not to use/make optimal tokenizer or stop-word remover but instead to make sure every part of the chain could be done in parallel.
My biggest complaint? Repeatedly having to restart the cluster because nodes stop responding or throw arcane errors. If these reliability fixes were in place last week, I would have easily been 20-25% more efficient with my time. Can't wait till Databricks team deploys this for their users.
Presumably the task had been split up in the past to work around performance limitations of Hive; the fact that they got better performance in a single step shows that the tooling is improving and much less complexity is required to implement this kind of job.
- BigQuery's execution is pipelined (don't wait for step 1 to finish to start step 2)
- BigQuery's in-memory shuffler is not co-tenant to compute
And, of course, it's one thing to have software and hardware. BigQuery provides a fully-managed, fully-encrypted, HA, redundant, and constantly seamlessly maintaned and upgraded service .
(disc: work on BigQuery)
Sorry - what does co-tenant to compute mean?
By co-tenant to compute, I mean that processing nodes themselves handle the shuffle in Spark. This can cause non-obvious bottlenecks. BigQuery handles shuffle outside of the processing nodes .
On the upside, massively parallel SQL databases allow you to express what probably took facebook a lot of code in a single SQL query.
On the downside, with any database that supports streaming inserts and dynamic resharding, you will have to eventually write each entry more than once. The more manual approach with spark prevents this.
Although you have to distribute the library across to all of the data nodes (if using YARN) or Spark nodes (if in standalone).
I could be completely missing something here, but to decrease the number of output files you can coalesce() the RDD before you write. For example, let's say you have a 20 node cluster, each with 10 executors, and the RDD action is split into 20000 tasks. You may end up with 20000 partitions (or more). However, you can coalesce, and reduce the number down to 200 partitions. Or, if necessary, you could even shuffle (across VMs but within the node) down to 20 partitions, if you're really motivated.
What am I missing?
"As far as we know, this is the largest real-world Spark job attempted in terms of shuffle data size"
I'm far, far from a world class engineer, but I regularly do 90 TiB shuffle sorts. I must seriously be missing something, here.
Out of the linked issues these all seem like they would be "easy" to hit given enough data:
Coalescing makes more sense when some stage of the pipeline dramatically shrinks the amount of data (e.g. grep-ing error logs from all log files) so that successive stages can easily handle the rest of the data with much fewer executors.
(disc.: Spark committer)
I have been debating if I should go ahead and start switching to 2.0.0 and this is pushing me in that direction.
It is a natural way to process certain types of data in certain ways. While there are lots of use cases like that, I have seen too many efforts to force things into Hadoop MapReduce/YARN or similar platforms just because it is the parallel processing hotness of the year.
A 60TB dataset can fit in one server. Isn't Spark intended for massive clusters, like dozens or hundreds of servers, over petabytes of data?
Mostly it's about speed. With multiple machines you can bring more cores to bear for the processing, and have more RAM to cache partial results. Postgres could certainly do the job, but I'd be surprised if it would run within an order of magnitude of these results.
And why would they mention Yahoo ? They stopped being particularly relevant in the Hadoop space quite a few years ago.