Spark Performance Tuning-Learn to Tune Apache Spark Job


1. Objective

The Spark Performance Tuning is the process of adjusting settings to record for memory, cores, and instances used by the system. This process guarantees that the Spark has optimal performance and prevents resource bottlenecking in Spark.

In this Tutorial of Performance tuning in Apache Spark, we will provide you complete details about How to tune your Apache Spark jobs? This Spark Tutorial covers performance tuning introduction in Apache Spark, Spark Data Serialization libraries such as Java serializationKryo serialization, Spark Memory tuning. We will also learn about Spark Data Structure Tuning, Spark Data Locality and Garbage Collection Tuning in Spark in this Spark performance tuning and Optimization tutorial.

A Complete Apache Spark Performance Tuning Tutorial.

Refer this guide to learn the Apache Spark installation in the Standalone mode.

2. What is Performance Tuning in Apache Spark?

The process of adjusting settings to record for memory, cores, and instances used by the system is termed tuning. This process guarantees that the Spark has optimal performance and prevents resource bottlenecking. Effective changes are made to each property and settings, to ensure the correct usage of resources based on system specific setup. Apache Spark has in-memory computation nature. As a result resources in the cluster (CPU, memory etc.) may get bottlenecked.

Sometimes to decrease memory usage RDDs are stored in serialized form. Data serialization plays important role in good network performance and can also help in reducing memory usage, and memory tuning.

If used properly, tuning can:

  • Ensure proper use of all resources in an effective manner.
  • Eliminates those jobs that run long.
  • Improves the performance time of the system.
  • Guarantees that jobs are on correct execution engine.

3. Data Serialization in Spark

It is the process of converting the in-memory object to another format that can be used to store in a file or send over the network. It plays a distinctive role in the performance of any distributed application. The computation gets slower due to formats that are slow to serialize or consume a large number of files. Apache Spark gives two serialization libraries:

  • Java serialization
  • Kryo serialization

Java serialization – Objects are serialized in Spark using an ObjectOutputStream framework, and can run with any class that implements java.io.Serializable. The performance of serialization can be controlled by extending java.io.Externalizable. It is flexible but slow and leads to large serialized formats for many classes.

Kryo serialization – To serialize objects, Spark can use the Kryo library (Version 2). Although it is more compact than Java serialization, it does not support all Serializable types. For better performance, we need to register the classes in advance. We can switch to Karyo by initializing our job with SparkConf and calling-

conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)

We use the registerKryoClasses method, to register our own class with Kryo. In case our objects are large we need to increase spark.kryoserializer.buffer config. The value should be large so that it can hold the largest object we want to serialize.

Get the Best Spark Books to become Master of Apache Spark.

4. Memory Tuning in Spark

Consider the following three things in tuning memory usage:

  • Amount of memory used by objects (the entire dataset should fit in-memory)
  • The cost of accessing those objects
  • Overhead of garbage collection.

The Java objects can be accessed but consume 2-5x more space than the raw data inside their field. The reasons for such behavior are:

  • Every distinct Java object has an “object header”. The size of this header is 16 bytes. Sometimes the object has little data in it, thus in such cases, it can be bigger than the data.
  • There are about 40 bytes of overhead over the raw string data in Java String. It stores each character as two bytes because of String’s internal usage of UTF-16 encoding. If there are 10 characters String, it can easily consume 60 bytes.
  • Common collection classes like HashMap and LinkedList make use of linked data structure, there we have “wrapper” object for every entry. This object has both header and pointer (8 bytes each) to the next object in the list.
  • Collections of primitive types often store them as “boxed objects”. For example, java.lang.Integer.

a. Spark Data Structure Tuning

By avoiding the Java features that add overhead we can reduce the memory consumption. There are several ways to achieve this:

  • Avoid the nested structure with lots of small objects and pointers.
  • Instead of using strings for keys, use numeric IDs or enumerated objects.
  • If the RAM size is less than 32 GB, set JVM flag to –xx:+UseCompressedOops to make a pointer to four bytes instead of eight.

b. Spark Garbage Collection Tuning

JVM garbage collection is problematic with large churn RDD stored by the program. To make room for new objects, Java removes the older one; it traces all the old objects and finds the unused one. But the key point is that cost of garbage collection in Spark is proportional to a number of Java objects. Thus, it is better to use a data structure in Spark with lesser objects. One more way to achieve this is to persist objects in serialized form. As a result, there will be only one object per RDD partition.

5. Memory Management in Spark

We consider Spark memory management under two categories: execution and storage. The memory which is for computing in shuffles, Joins, aggregation is Execution memory. While the one for caching and propagating internal data in the cluster is storage memory.  Both execution and storage share a unified region M. When the execution memory is not in use, the storage can use all the memory. The same case lies true for Storage memory. Execution can drive out the storage if necessary. This is done only until storage memory usage falls under certain threshold R.

We can get several properties by this design. First, the application can use entire space for execution if it does not use caching. While the applications that use caching can reserve a small storage (R), where data blocks are immune to evict.

Even though we have two relevant configurations, the users need not adjust them. Because default values are relevant to most workloads:

  • memory.fraction describes the size of M as a fraction of the (JVM heap space-300MB)(default 0.6). The remaining 40% is stored in user data structure, internal metadata in Spark and safeguarding against OOM error in case of Sparse and large records.
  • memory.storageFraction shows the size of R as the fraction of M (default 0.5).

Learn How Fault Tolerance is achieved in Apache Spark.

6. Determining Memory Consumption in Spark

If we want to know the size of Spark memory consumption a dataset will require to create an RDD, put that RDD into the cache and look at “Storage” page in Web UI. This page will let us know the amount of memory RDD is occupying.

If we want to know the memory consumption of particular object, use SizeEstimator’S estimate method.

7. Spark Garbage Collection Tuning

In garbage collection, tuning in Apache Spark, the first step is to gather statistics on how frequently garbage collection occurs. It also gathers the amount of time spent in garbage collection. Thus, can be achieved by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to Java option. The next time when Spark job run, a message will display in workers log whenever garbage collection occurs. These logs will be in worker node, not on drivers program.

Java heap space divides into two regions Young and Old. The young generation holds short-lived objects while Old generation holds objects with longer life. The garbage collection tuning aims at, long-lived RDDs in the old generation. It also aims at the size of a young generation which is enough to store short-lived objects. With this, we can avoid full garbage collection to gather temporary object created during task execution. Some steps that may help to achieve this are:

  • If full garbage collection is invoked several times before a task is complete this ensures that there is not enough memory to execute the task.
  • In garbage collection statistics, if OldGen is near to full we can reduce the amount of memory used for caching. This can be achieved by lowering spark.memory.fraction. the better choice is to cache fewer objects than to slow down task execution. Or we can decrease the size of young generation i.e., lowering –Xmn.

The effect of Apache Spark garbage collection tuning depends on our application and amount of memory used.

8. Other consideration for Spark Performance Tuning

a. Level of Parallelism

To use the full cluster the level of parallelism of each program should be high enough. According to the size of the file, Spark sets the number of “Map” task to run on each file. The level of parallelism can be passed as a second argument. We can set the config property spark.default.parallelism to change the default.

b. Memory Usage of Reduce Task in Spark

Although RDDs fit in our memory many times we come across a problem of OutOfMemoryError. This is because the working set of our task say groupByKey is too large. We can fix this by increasing the level of parallelism so that each task’s input set is small. We can increase the number of cores in our cluster because Spark reuses one executor JVM across many tasks and has low task launching cost.

Learn about groupByKey and other Transformations and Actions API in Apache Spark with examples.

c. Broadcasting Large Variables

The size of each serialized task reduces by using broadcast functionality in SparkContext. If a task uses a large object from driver program inside of them, turn it into the broadcast variable. Generally, it considers the tasks that are about 20 Kb for optimization.

d. Data Locality in Apache Spark

Data locality plays an important role in the performance of Spark Jobs. The case in which the data and code that operates on that data are together, the computation is faster. But if the two are separate, then either the code should be moved to data or vice versa. It is faster to move serialized code from place to place then the chunk of data because the size of the code is smaller than the data.

Based on data current location there are various levels of locality. The order from closest to farthest is:

  • The best possible locality is that the PROCESS_LOCAL resides in same JVM as the running code.
  • NODE_LOCAL resides on the same node in this. It is because the data travel between processes is quite slower than PROCESS_LOCAL.
  • There is no locality preference in NO_PREF data is accessible from anywhere.
  • RACK_LOCAL data is on the same rack of the server. Since the data is on the same rack but on the different server, so it sends the data in the network, through a single switch.
  • ANY data resides somewhere else in the network and not in the same rack.

9. Conclusion

Consequently, to increase the performance of the system performance tuning plays the vital role. Serializing the data plays an important role in tuning the system. Spark employs a number of optimization techniques to cut the processing time. Thus, Performance Tuning guarantees the better performance of the system.

After learning performance tuning in Apache Spark, Follow this guide to learn How Apache Spark works in detail.

You can share your queries about Spark performance tuning, by leaving a comment. We will be happy to solve them.

See Also-

Leave a comment

Your email address will not be published. Required fields are marked *