2 回答

TA贡献1776条经验 获得超12个赞
我做了几个实验,如下所示。显然,数据帧一旦缓存,就会保持缓存状态(如 和 查询计划等 所示),即使所有 Python 引用都被覆盖或完全删除,并且显式调用了垃圾回收。getPersistentRDDsInMemorydel
实验 1:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df2.select('*').explain()
del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}
>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#172L AS col1#174L]
+- *(1) Scan ExistingRDD[_1#172L]
>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
实验 2:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df.select('*').explain()
del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}
>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}
>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#218L AS col1#220L]
+- *(1) Scan ExistingRDD[_1#218L]
>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
实验3(对照实验,证明无孔徒有效)
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
df2.select('*').explain()
df.unpersist()
df2.select('*').explain()
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}
>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#310L AS col1#312L]
+- *(1) Scan ExistingRDD[_1#310L]
>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
+- *(1) Scan ExistingRDD[_1#310L]
回答OP的问题:
这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置滤波器df将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?
实验表明两者都没有。数据帧保持缓存状态,不进行垃圾回收,并且根据查询计划使用缓存的(不可引用的)数据帧计算新数据帧。
与缓存使用相关的一些有用功能(如果您不想通过 Spark UI 执行此操作)是:
sc._jsc.getPersistentRDDs(),其中显示了缓存的 RDD/数据帧的列表,以及
spark.catalog.clearCache(),这将清除所有缓存的 RDD/数据帧。
我在执行上述操作时是否偏离了最佳实践?
我没有权力对此进行判断,但正如其中一条评论所建议的那样,避免重新分配,因为数据帧是不可变的。试着想象你正在用scala编码,你被定义为.做是不可能的。Python本身无法强制执行,但我认为最佳做法是避免覆盖任何数据帧变量,这样,如果您不再需要缓存的结果,则可以随时调用。dfdfvaldf = df.filter(...)df.unpersist()

TA贡献1725条经验 获得超8个赞
想提出几点,希望能澄清Spark在缓存方面的行为。
当您有
df = ... do stuff...
df.cache()
df.count()
...然后在应用程序中的其他位置
another_df = ... do *same* stuff...
another_df.*some_action()*
...,您希望重用缓存的数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点,Spark开发人员决定使用分析的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖于来自应用程序端的引用。在 Spark 中,CacheManager 是跟踪缓存计算的组件,按索引顺序排列:another_dfdfcachedData
/**
* Maintains the list of cached plans as an immutable sequence. Any updates to the list
* should be protected in a "this.synchronized" block which includes the reading of the
* existing value and the update of the cachedData var.
*/
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
在查询规划期间(在缓存管理器阶段),将扫描此结构以查找正在分析的计划的所有子树,以查看是否已计算出其中的任何子树。如果找到匹配项,Spark 会将此子树替换为相应的 from 。InMemoryRelationcachedData
cache()(的简单同义词 ) 函数通过调用 cacheQuery(...) 来存储具有存储级别的数据帧persist()MEMORY_AND_DISKCacheManager
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(...
请注意,这与使用级别的 RDD 缓存不同。一旦缓存了数据帧,它们就会保留在内存或本地执行器磁盘上缓存,直到它们被显式'ed',或者调用CacheManager。当执行程序存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但永远不会简单地“丢弃”。MEMORY_ONLYunpersistclearCache()
顺便说一句,好问题...
添加回答
举报