为了账号安全,请及时绑定邮箱和手机立即绑定

如果我缓存 Spark 数据帧,然后覆盖引用,原始数据帧是否仍会被缓存?

如果我缓存 Spark 数据帧,然后覆盖引用,原始数据帧是否仍会被缓存?

ITMISS 2022-08-02 11:00:23
假设我有一个函数来生成一个(py)spark数据帧,将数据帧缓存到内存中作为最后一个操作。def gen_func(inputs):    df = ... do stuff...    df.cache()    df.count()       return df根据我的理解,Spark的缓存工作如下:当在数据帧上调用一个动作()时,它将从其DAG计算并缓存到内存中,并附加到引用它的对象上。cache/persistcount()只要存在对该对象的引用(可能在其他函数/其他作用域中),df 将继续缓存,并且依赖于 df 的所有 DAG 都将使用内存中缓存的数据作为起点。如果删除了对 df 的所有引用,Spark 会将缓存作为要进行垃圾回收的内存。它可能不会立即被垃圾回收,导致一些短期内存块(特别是,如果您生成缓存数据并过快地丢弃它们,则会导致内存泄漏),但最终它将被清除。我的问题是,假设我用于生成一个数据框,但随后覆盖原始数据框引用(可能带有a或a)。gen_funcfilterwithColumndf=gen_func(inputs) df=df.filter("some_col = some_val")在 Spark 中,RDD/DF 是不可变的,因此在滤波器之后重新分配的 df 和在滤波器之前的 df 指的是两个完全不同的对象。在本例中,对原始 df 的引用已被覆盖。这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置过滤器将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?cache/counteddf我之所以问这个问题,是因为我最近修复了代码中的一些内存不足问题,在我看来,缓存可能是问题所在。但是,我还没有真正了解使用缓存的安全方法的全部细节,以及如何意外地使缓存的内存失效。在我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?
查看完整描述

2 回答

?
叮当猫咪

TA贡献1776条经验 获得超12个赞

我做了几个实验,如下所示。显然,数据帧一旦缓存,就会保持缓存状态(如 和 查询计划等 所示),即使所有 Python 引用都被覆盖或完全删除,并且显式调用了垃圾回收。getPersistentRDDsInMemorydel


实验 1:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

del df

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df2.select('*').explain()


del df2

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

结果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o234}


>>> df2 = df.filter('col1 != 2')

>>> del df

>>> import gc

>>> gc.collect()

93

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o240}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]

         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#172L AS col1#174L]

                  +- *(1) Scan ExistingRDD[_1#172L]


>>> del df2

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o250}

实验 2:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df = df.filter('col1 != 2')

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df.select('*').explain()


del df

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

结果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o317}


>>> df = df.filter('col1 != 2')

>>> import gc

>>> gc.collect()

244

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o323}


>>> df.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]

         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#218L AS col1#220L]

                  +- *(1) Scan ExistingRDD[_1#218L]


>>> del df

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o333}

实验3(对照实验,证明无孔徒有效)


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

df2.select('*').explain()


df.unpersist()

df2.select('*').explain()

结果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{116: JavaObject id=o398}


>>> df2 = df.filter('col1 != 2')

>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]

         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#310L AS col1#312L]

                  +- *(1) Scan ExistingRDD[_1#310L]


>>> df.unpersist()

DataFrame[col1: bigint]

>>> sc._jsc.getPersistentRDDs()

{}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Project [_1#310L AS col1#312L]

+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))

   +- *(1) Scan ExistingRDD[_1#310L]

回答OP的问题:


这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置滤波器df将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?


实验表明两者都没有。数据帧保持缓存状态,不进行垃圾回收,并且根据查询计划使用缓存的(不可引用的)数据帧计算新数据帧。


与缓存使用相关的一些有用功能(如果您不想通过 Spark UI 执行此操作)是:


sc._jsc.getPersistentRDDs(),其中显示了缓存的 RDD/数据帧的列表,以及


spark.catalog.clearCache(),这将清除所有缓存的 RDD/数据帧。


我在执行上述操作时是否偏离了最佳实践?


我没有权力对此进行判断,但正如其中一条评论所建议的那样,避免重新分配,因为数据帧是不可变的。试着想象你正在用scala编码,你被定义为.做是不可能的。Python本身无法强制执行,但我认为最佳做法是避免覆盖任何数据帧变量,这样,如果您不再需要缓存的结果,则可以随时调用。dfdfvaldf = df.filter(...)df.unpersist()


查看完整回答
反对 回复 2022-08-02
?
qq_遁去的一_1

TA贡献1725条经验 获得超8个赞

想提出几点,希望能澄清Spark在缓存方面的行为。


当您有


df = ... do stuff...

df.cache()

df.count()

...然后在应用程序中的其他位置


   another_df = ... do *same* stuff...

   another_df.*some_action()*

...,您希望重用缓存的数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点,Spark开发人员决定使用分析的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖于来自应用程序端的引用。在 Spark 中,CacheManager 是跟踪缓存计算的组件,按索引顺序排列:another_dfdfcachedData


  /**

   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list

   * should be protected in a "this.synchronized" block which includes the reading of the

   * existing value and the update of the cachedData var.

   */

  @transient @volatile

  private var cachedData = IndexedSeq[CachedData]()

在查询规划期间(在缓存管理器阶段),将扫描此结构以查找正在分析的计划的所有子树,以查看是否已计算出其中的任何子树。如果找到匹配项,Spark 会将此子树替换为相应的 from 。InMemoryRelationcachedData


cache()(的简单同义词 ) 函数通过调用 cacheQuery(...) 来存储具有存储级别的数据帧persist()MEMORY_AND_DISKCacheManager

      /**

       * Caches the data produced by the logical representation of the given [[Dataset]].

       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because

       * recomputing the in-memory columnar representation of the underlying table is expensive.

       */

      def cacheQuery(...

请注意,这与使用级别的 RDD 缓存不同。一旦缓存了数据帧,它们就会保留在内存或本地执行器磁盘上缓存,直到它们被显式'ed',或者调用CacheManager。当执行程序存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但永远不会简单地“丢弃”。MEMORY_ONLYunpersistclearCache()


顺便说一句,好问题...


查看完整回答
反对 回复 2022-08-02
  • 2 回答
  • 0 关注
  • 158 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号