2 回答

TA貢獻1776條經驗 獲得超12個贊
我做了幾個實驗,如下所示。顯然,數據幀一旦緩存,就會保持緩存狀態(如 和 查詢計劃等 所示),即使所有 Python 引用都被覆蓋或完全刪除,并且顯式調用了垃圾回收。getPersistentRDDsInMemorydel
實驗 1:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df2.select('*').explain()
del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
結果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}
>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#172L AS col1#174L]
+- *(1) Scan ExistingRDD[_1#172L]
>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
實驗 2:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df.select('*').explain()
del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
結果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}
>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}
>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#218L AS col1#220L]
+- *(1) Scan ExistingRDD[_1#218L]
>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
實驗3(對照實驗,證明無孔徒有效)
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
df2.select('*').explain()
df.unpersist()
df2.select('*').explain()
結果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}
>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#310L AS col1#312L]
+- *(1) Scan ExistingRDD[_1#310L]
>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
+- *(1) Scan ExistingRDD[_1#310L]
回答OP的問題:
這是否意味著緩存的數據框不再可用,將被垃圾回收?這是否意味著新的后置濾波器df將從頭開始計算所有內容,盡管它是從以前緩存的數據幀生成的?
實驗表明兩者都沒有。數據幀保持緩存狀態,不進行垃圾回收,并且根據查詢計劃使用緩存的(不可引用的)數據幀計算新數據幀。
與緩存使用相關的一些有用功能(如果您不想通過 Spark UI 執行此操作)是:
sc._jsc.getPersistentRDDs(),其中顯示了緩存的 RDD/數據幀的列表,以及
spark.catalog.clearCache(),這將清除所有緩存的 RDD/數據幀。
我在執行上述操作時是否偏離了最佳實踐?
我沒有權力對此進行判斷,但正如其中一條評論所建議的那樣,避免重新分配,因為數據幀是不可變的。試著想象你正在用scala編碼,你被定義為.做是不可能的。Python本身無法強制執行,但我認為最佳做法是避免覆蓋任何數據幀變量,這樣,如果您不再需要緩存的結果,則可以隨時調用。dfdfvaldf = df.filter(...)df.unpersist()

TA貢獻1725條經驗 獲得超8個贊
想提出幾點,希望能澄清Spark在緩存方面的行為。
當您有
df = ... do stuff...
df.cache()
df.count()
...然后在應用程序中的其他位置
another_df = ... do *same* stuff...
another_df.*some_action()*
...,您希望重用緩存的數據幀。畢竟,重用先前計算的結果是緩存的目標。意識到這一點,Spark開發人員決定使用分析的邏輯計劃作為識別緩存數據幀的“關鍵”,而不是僅僅依賴于來自應用程序端的引用。在 Spark 中,CacheManager 是跟蹤緩存計算的組件,按索引順序排列:another_dfdfcachedData
/**
* Maintains the list of cached plans as an immutable sequence. Any updates to the list
* should be protected in a "this.synchronized" block which includes the reading of the
* existing value and the update of the cachedData var.
*/
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
在查詢規劃期間(在緩存管理器階段),將掃描此結構以查找正在分析的計劃的所有子樹,以查看是否已計算出其中的任何子樹。如果找到匹配項,Spark 會將此子樹替換為相應的 from 。InMemoryRelationcachedData
cache()(的簡單同義詞 ) 函數通過調用 cacheQuery(...) 來存儲具有存儲級別的數據幀persist()MEMORY_AND_DISKCacheManager
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(...
請注意,這與使用級別的 RDD 緩存不同。一旦緩存了數據幀,它們就會保留在內存或本地執行器磁盤上緩存,直到它們被顯式'ed',或者調用CacheManager。當執行程序存儲內存完全填滿時,緩存塊開始使用 LRU(最近最少使用)推送到磁盤,但永遠不會簡單地“丟棄”。MEMORY_ONLYunpersistclearCache()
順便說一句,好問題...
添加回答
舉報