Dataset Caching and Persistence

Table 1. Caching Operators (Basic Actions)
Operator Description

cache

Basic action to cache a Dataset

cache(): this.type

persist

Basic action to persist a Dataset

persist(): this.type
persist(newLevel: StorageLevel): this.type

unpersist

Basic action to unpersist a cached Dataset

unpersist(): this.type
unpersist(blocking: Boolean): this.type
// Cache Dataset -- it is lazy
scala> val df = spark.range(1).cache
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

// Trigger caching
scala> df.show
+---+
| id|
+---+
|  0|
+---+

// Visit http://localhost:4040/storage to see the Dataset cached. It should.

// You may also use queryExecution or explain to see InMemoryRelation
// InMemoryRelation is used for cached queries
scala> df.queryExecution.withCachedData
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *Range (0, 1, step=1, splits=Some(8))

// Use the cached Dataset in another query
// Notice InMemoryRelation in use for cached queries
scala> df.withColumn("newId", 'id).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, 'id AS newId#16]
+- Range (0, 1, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint, newId: bigint
Project [id#0L, id#0L AS newId#16L]
+- Range (0, 1, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#0L, id#0L AS newId#16L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *Range (0, 1, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

// Clear in-memory cache using SQL
// Equivalent to spark.catalog.clearCache
scala> sql("CLEAR CACHE").collect
res1: Array[org.apache.spark.sql.Row] = Array()

// Visit http://localhost:4040/storage to confirm the cleaning
Note

You can also use SQL’s CACHE TABLE [tableName] to cache tableName table in memory. Unlike cache and persist operators, CACHE TABLE is an eager operation which is executed as soon as the statement is executed.

sql("CACHE TABLE [tableName]")

You could however use LAZY keyword to make caching lazy.

sql("CACHE LAZY TABLE [tableName]")

Use SQL’s REFRESH TABLE [tableName] to refresh a cached table.

Use SQL’s UNCACHE TABLE (IF EXISTS)? [tableName] to remove a table from cache.

Use SQL’s CLEAR CACHE to remove all tables from cache.

Note

Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)
Tip

You can check whether a Dataset was cached or not using the following code:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

SQL’s CACHE TABLE

SQL’s CACHE TABLE corresponds to requesting the session-specific Catalog to caching the table.

Internally, CACHE TABLE becomes CacheTableCommand runnable command that…​FIXME

results matching ""

    No results matching ""