亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如果我緩存 Spark 數據幀,然后覆蓋引用,原始數據幀是否仍會被緩存?

如果我緩存 Spark 數據幀,然后覆蓋引用,原始數據幀是否仍會被緩存?

ITMISS 2022-08-02 11:00:23
假設我有一個函數來生成一個(py)spark數據幀,將數據幀緩存到內存中作為最后一個操作。def gen_func(inputs):    df = ... do stuff...    df.cache()    df.count()       return df根據我的理解,Spark的緩存工作如下:當在數據幀上調用一個動作()時,它將從其DAG計算并緩存到內存中,并附加到引用它的對象上。cache/persistcount()只要存在對該對象的引用(可能在其他函數/其他作用域中),df 將繼續緩存,并且依賴于 df 的所有 DAG 都將使用內存中緩存的數據作為起點。如果刪除了對 df 的所有引用,Spark 會將緩存作為要進行垃圾回收的內存。它可能不會立即被垃圾回收,導致一些短期內存塊(特別是,如果您生成緩存數據并過快地丟棄它們,則會導致內存泄漏),但最終它將被清除。我的問題是,假設我用于生成一個數據框,但隨后覆蓋原始數據框引用(可能帶有a或a)。gen_funcfilterwithColumndf=gen_func(inputs) df=df.filter("some_col = some_val")在 Spark 中,RDD/DF 是不可變的,因此在濾波器之后重新分配的 df 和在濾波器之前的 df 指的是兩個完全不同的對象。在本例中,對原始 df 的引用已被覆蓋。這是否意味著緩存的數據框不再可用,將被垃圾回收?這是否意味著新的后置過濾器將從頭開始計算所有內容,盡管它是從以前緩存的數據幀生成的?cache/counteddf我之所以問這個問題,是因為我最近修復了代碼中的一些內存不足問題,在我看來,緩存可能是問題所在。但是,我還沒有真正了解使用緩存的安全方法的全部細節,以及如何意外地使緩存的內存失效。在我的理解中缺少什么?我在執行上述操作時是否偏離了最佳實踐?
查看完整描述

2 回答

?
叮當貓咪

TA貢獻1776條經驗 獲得超12個贊

我做了幾個實驗,如下所示。顯然,數據幀一旦緩存,就會保持緩存狀態(如 和 查詢計劃等 所示),即使所有 Python 引用都被覆蓋或完全刪除,并且顯式調用了垃圾回收。getPersistentRDDsInMemorydel


實驗 1:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

del df

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df2.select('*').explain()


del df2

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

結果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o234}


>>> df2 = df.filter('col1 != 2')

>>> del df

>>> import gc

>>> gc.collect()

93

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o240}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]

         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#172L AS col1#174L]

                  +- *(1) Scan ExistingRDD[_1#172L]


>>> del df2

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o250}

實驗 2:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df = df.filter('col1 != 2')

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df.select('*').explain()


del df

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

結果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o317}


>>> df = df.filter('col1 != 2')

>>> import gc

>>> gc.collect()

244

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o323}


>>> df.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]

         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#218L AS col1#220L]

                  +- *(1) Scan ExistingRDD[_1#218L]


>>> del df

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o333}

實驗3(對照實驗,證明無孔徒有效)


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

df2.select('*').explain()


df.unpersist()

df2.select('*').explain()

結果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{116: JavaObject id=o398}


>>> df2 = df.filter('col1 != 2')

>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]

         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#310L AS col1#312L]

                  +- *(1) Scan ExistingRDD[_1#310L]


>>> df.unpersist()

DataFrame[col1: bigint]

>>> sc._jsc.getPersistentRDDs()

{}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Project [_1#310L AS col1#312L]

+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))

   +- *(1) Scan ExistingRDD[_1#310L]

回答OP的問題:


這是否意味著緩存的數據框不再可用,將被垃圾回收?這是否意味著新的后置濾波器df將從頭開始計算所有內容,盡管它是從以前緩存的數據幀生成的?


實驗表明兩者都沒有。數據幀保持緩存狀態,不進行垃圾回收,并且根據查詢計劃使用緩存的(不可引用的)數據幀計算新數據幀。


與緩存使用相關的一些有用功能(如果您不想通過 Spark UI 執行此操作)是:


sc._jsc.getPersistentRDDs(),其中顯示了緩存的 RDD/數據幀的列表,以及


spark.catalog.clearCache(),這將清除所有緩存的 RDD/數據幀。


我在執行上述操作時是否偏離了最佳實踐?


我沒有權力對此進行判斷,但正如其中一條評論所建議的那樣,避免重新分配,因為數據幀是不可變的。試著想象你正在用scala編碼,你被定義為.做是不可能的。Python本身無法強制執行,但我認為最佳做法是避免覆蓋任何數據幀變量,這樣,如果您不再需要緩存的結果,則可以隨時調用。dfdfvaldf = df.filter(...)df.unpersist()


查看完整回答
反對 回復 2022-08-02
?
qq_遁去的一_1

TA貢獻1725條經驗 獲得超8個贊

想提出幾點,希望能澄清Spark在緩存方面的行為。


當您有


df = ... do stuff...

df.cache()

df.count()

...然后在應用程序中的其他位置


   another_df = ... do *same* stuff...

   another_df.*some_action()*

...,您希望重用緩存的數據幀。畢竟,重用先前計算的結果是緩存的目標。意識到這一點,Spark開發人員決定使用分析的邏輯計劃作為識別緩存數據幀的“關鍵”,而不是僅僅依賴于來自應用程序端的引用。在 Spark 中,CacheManager 是跟蹤緩存計算的組件,按索引順序排列:another_dfdfcachedData


  /**

   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list

   * should be protected in a "this.synchronized" block which includes the reading of the

   * existing value and the update of the cachedData var.

   */

  @transient @volatile

  private var cachedData = IndexedSeq[CachedData]()

在查詢規劃期間(在緩存管理器階段),將掃描此結構以查找正在分析的計劃的所有子樹,以查看是否已計算出其中的任何子樹。如果找到匹配項,Spark 會將此子樹替換為相應的 from 。InMemoryRelationcachedData


cache()(的簡單同義詞 ) 函數通過調用 cacheQuery(...) 來存儲具有存儲級別的數據幀persist()MEMORY_AND_DISKCacheManager

      /**

       * Caches the data produced by the logical representation of the given [[Dataset]].

       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because

       * recomputing the in-memory columnar representation of the underlying table is expensive.

       */

      def cacheQuery(...

請注意,這與使用級別的 RDD 緩存不同。一旦緩存了數據幀,它們就會保留在內存或本地執行器磁盤上緩存,直到它們被顯式'ed',或者調用CacheManager。當執行程序存儲內存完全填滿時,緩存塊開始使用 LRU(最近最少使用)推送到磁盤,但永遠不會簡單地“丟棄”。MEMORY_ONLYunpersistclearCache()


順便說一句,好問題...


查看完整回答
反對 回復 2022-08-02
  • 2 回答
  • 0 關注
  • 134 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號