RDD Caching and Persistence

Introduction

Caching or persistence are optimisation techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages. These interim results as RDDs are thus kept in memory (default) or more solid storages like disk and/or replicated.

RDDs are Recomputed

RDDs by default is recomputed each time an action is run on them. For example,

 
scala> val lines = sc.<b>textFile</b>("words.txt")
...
scala> <b>lines</b>.<b>first</b>()
res4: String = line1 word1
scala> lines.<b>count</b>()
res5: Long = 4

 

Here the call to action first() computes the RDD ‘lines‘. Again when we use another action ‘count()‘ on the same RDD, the RDD is recomputed once again

Persisting RDDs

The default behavior of recomputing the RDDs on each action can be overridden by persisting the RDDs, so that no re-computation is done each time an action is called on the RDD. When persisted, each node that compute the RDD store the result in their Partitions

We use persist() method to persist an RDD. In Scala & Java, by default, persist()will store the data in JVM as unserialized object. In Python, calling persist() will serialize the data before persisting. Options to store in Memory/Disk combination is also possible.

 

 
$ spark-shell 

scala> val lines = sc.textFile("words.txt")
...
scala> import org.apache.spark.storage.StorageLevel

// We can also use cache() method if we need MEMORY_ONLY storage level
scala> lines.persist(StorageLevel.MEMORY_ONLY) 

...
scala> lines.count() (1)

 

The actual persistence takes place during the first (1) action call on the RDD. Spark provides multiple Storage options(Memory/Disk) to persist the data as well as Replication Levels.

We use unpersist() to unpersist RDD. When the cached data exceeds the Memory capacity, Spark automatically evicts the old partitions(it will be recalculated when needed). This is called Last Recently used Cache(LRU) policy