Tuning Guidelines for Apache Spark

At EverTrue, we’ve been building on Apache Spark for almost four months now and have hit just about every wall as it pertains to configuration one could imagine.

One of the surprises we were met with was the sheer amount of configuration options available with Spark. It gives you full control of almost everything you can dream up.

The downside of that is knowing what the important levers to tune and how they relate to one another.

This is a distilled explanation of our experiences thus far.

The main “levers” for tuning

Executor Memory

Controlled by the spark.executor.memory property, this is configurable per job but like the name implies, is scoped to the executor. So if you run multiple executors, each executor will inherit this setting for a given job.

This is an important boundary to understand because if you get it wrong, you’ll start seeing errors like java.lang.OutOfMemoryError: GC overhead limit exceeded in your logs and your job will likely fail to complete.

Increasing your memory is an option of course and can sometimes be the right call to get a job properly tuned. However, if you are doing a workload that is streaming data that far exceeds your memory then this lever can be tuned only so far. If that’s your use case, you’ll need to use different levers to get your job running successfully. See below for more.

Number of cores

Controlled by the spark.cores.max property. IMHO, I think a better name for this would of been spark.threads.max. Because ultimately, this is a lever that controls parallelism. By default, Spark will use the toal number of cores on the box, but you can “oversubscribe” Spark to utilize more. Setting SPARK_WORKER_CORES to something higher will do the trick. You’ll definitely want to do this if your workloads have even a modest amount of I/O wait for things like API calls or one off disk lookups.

So given that it’s more approriate to think about this setting as a level of parellelism, it’s important to understand how this parellelism relates to the other two main levers for tuning.

Simply put, if you can process more things at once, you need more memory. Let’s expand on that.

If for example, you can process 20 tasks at a time on a given executor. That means whatever memory is needed by a transform or action function needs 20 times that amount in order to not hit an OOME. So in that sense, while it may seem like jacking up your parallelism just makes things go fast, you will likely need to tune other levers in conjunction.

If you are increasing spark.cores.max, you may also want to increase your parititon size and/or spark.executor.memory. Increasing partitions doesn’t require additional resources and is a much cheaper way to get your job running smoothly.

Conversely, if you can’t increase partition size for some reason, lowering your spark.cores.max will reduce the amount of memory needed to process a task/partition. Also, you can increase the number of executors as this will spread the load across more JVM’s and relieve memory pressure.

Partition Size

The Partition size is akin to the old Hadoop Input Split settings. This controls how Spark is going to chunk up your work before it feeds it to your RDD, action, or transformation function.

This setting becomes crucial when you start employing transformations that require a FlatMapFunction like mapPartitions and mapPartitionsToPair. The reason is, inside these transformations, you are given an Iterable. When that happens, you have to maintain some kind of data structure like a List or Map in local memory in order to return an Iterable.

Take the example below:

1
2
3
4
5
6
7
8
9
10
public Iterable<Tuple2<String, MyObject>> call(Iterator<CassandraRow> cassandraRowIterator) throws Exception {
    List<Tuple2<String, MyObject>> results = Lists.newArrayList ();

    while (cassandraRowIterator.hasNext ()) {
        MyObject myObj = cassandraRowIterator.next().getBytes(0).array()

        results.add (new Tuple2<>(String.valueOf(myObj.getId())), myObj));
    }
    return results;
}

The results list is going to be your gating factor here w/r/t whatever you have set for spark.executor.memory mentioned above. If you configure these two settings incorrectly, you will run out of memory.

To combat this, you can increase the number of partitions on your RDD or add a repartition step on an existing RDD before calling into your transformation function. This will make your data smaller and fit into the confines of the heap size you set via spark.executor.memory and (in this example, make results smaller).

You could also just use a plain old map transform but you are likely to lose data locality provided by the mapPartitions* functions which will increase processing time.

Comments