Skip to content

Latest commit

 

History

History
44 lines (27 loc) · 4.36 KB

File metadata and controls

44 lines (27 loc) · 4.36 KB

RDD 永續儲存

Spark 最重要的一個功能是它可以通過各種操作(operations)永續儲存(或者緩存)一個集合到記憶體中。當你想儲存一個RDD 的時候,每一個節點都會參與計算的所有分區資料儲存到記憶體中,而且這些資料可以被這個集合(以及這個集合衍生出的其他集合)的動作( action )來重複使用。這樣的設計會使後續的動作速度加快(通常快10 倍以上)。對迭代算法與快速的交互操作來說,記憶體是一個關鍵點。

你可以用persist()cache() 方法來儲存一個RDD。首先,在action 中運算後取得RDD;接著,將它保存在每個節點的記憶體裡。Spark 的緩存是一個容錯的機制-如果RDD 的任何一個分區內不見,它可以透過原本的(transformations )操作,自動重複計算並且建立到這個分區內補足。

此外,可以利用不同的儲存機制來儲存每一個被永續化的 RDD 。例如,Spark 允許使用者將RDD 儲存在硬碟上、將集合序列化的 Java 物件永續儲存到記憶體、在節點之間複製或是儲存到Tachyon中。我們可以傳遞StorageLevel 物件給persist()方法來設定這些選項。cache() 方法則是預設儲存位置為—StorageLevel.MEMORY_ONLY。完整的設定說明如下:

Storage Level Meaning
MEMORY_ONLY 把RDD 作為非序列化的Java 物件儲存在在jvm中。如果RDD 不適合放在記憶體,一些分區將不會被儲存在記憶體內,而是在每次需要在分區內時重新計算。這是系統預設的儲存方式。
MEMORY_AND_DISK 將RDD 作為非序列化的Java 物件儲存在jvm中。如果RDD 不適合放在記憶體,將這些分區儲存在硬碟,需要再取出。
MEMORY_ONLY_SER 將RDD 作為序列化的Java 對象儲存(每個分區一個byte 數組)。這種方式比非序列化方式更加節省空間,特別是用快速序列化方式,只是更耗費cpu 資源—密集的讀取操作。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。
DISK_ONLY 僅僅將RDD 分區儲存在硬碟內
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的儲存方式類似,但是複製每個分區到集群的兩個節點上面
OFF_HEAP (experimental) 以序列化的格式儲存RDD 到Tachyon中。相較於MEMORY_ONLY_SER,OFF_HEAP 減少回收垃圾的耗損,允許更小的執行者分享記憶體。這使得在擁有大量記憶體的環境下或者多開發空間的環境中更具威力。

NOTE:在python中,儲存的物件都是通過Pickle 做序列化,所以是否選擇序列化並不重要。

Spark 也會自動永續儲存一些shuffle 操作(如reduceByKey)的中間資料,即使用戶沒有使用persist 方法。好處是避免在shuffle 發生錯誤情況下,需要重新計算整個輸入。如果使用者計畫重算過程中的RDD ,建議使用persist

如何選擇儲存方式

Spark 提供多種儲存方式意味在記憶體利用率和 cpu 利用率之間的平衡。我們推薦透過下列的步驟選擇一個合適的儲存方式:

  • 如果你的RDD 適合預設的儲存方式(MEMORY_ONLY),就使用預設方式。因為這是cpu 利用率最好的的選擇,RDD 上的操作會比較快。

  • 如果不適用系統預設的方式,選擇MEMORY_ONLY_SER。這是一個更快的序列化物件的空間使用率,速度也不錯。

  • 除非計算 RDD 耗損資源多,或是資料量過於龐大,不要將RDD 儲存在硬碟上,否則,重新計算一個分區就會和讀取硬碟資料一樣慢。

  • 如果希望錯誤恢復速度加快,可以利用重複(replicated) 儲存方式。所有的儲存方式都可以通過重複計算遺失的資料來支援容錯機制。

  • 在擁有大量記憶體的環境或者多應用程式的環境,OFF_HEAP 擁有下列優勢:

    • 它運行多個執行者共享Tachyon 中相同的記憶體池 ( memory pool)
    • 它明顯減少收垃圾的花費
    • 如果單個執行者毀損,記憶體的數據不會遺失

刪除資料

Spark 自動監控每個節點記憶體使用情況,利用最近最少使用原則來刪除老舊資料。如果你想手動刪除 RDD ,可以用RDD.unpersist()