在Spark中有幾個優化可以減少批次處理的時間。這些可以在優化指南中作了討論。這節重點討論幾個重要的。
藉由網路(如kafka,flume,socket等)接收資料需要這些資料反序列化並被保存到Spark中。如果資料接收成為系统的瓶頸,就要考慮平行地接收資料。注意,每個輸入DStream創建一個receiver
(運行在worker機器上)
接收單個資料串流。創建多個輸入DStream並配置它們可以從來源中接收不同分區的資料串流,從而實作多資料串流接收。例如,接收兩個topic資料的單個輸入DStream可以被切分為兩個kafka輸入串流,每個接收一個topic。這將
在兩個worker上運行兩個receiver
,因此允許資料平行接收,提高整體的吞吐量。多個DStream可以被合並生成單個DStream,這樣運用在單個輸入DStream的transformation操作可以運用在合並的DStream上。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另外一個需要考慮的參數是receiver
的阻塞時間。對於大部分的receiver
,在存入Spark記憶體之前,接收的資料都被合並成了一個大資料區塊。每批次資料中區塊的個數決定了任務的個數。這些任務是用類
似map的transformation操作接收的資料。阻塞間隔由配置參數spark.streaming.blockInterval
決定,預設的值是200毫秒。
多輸入串流或者多receiver
的可選的函數是明確地重新分配輸入資料串流(利用inputStream.repartition(<number of partitions>)
),在進一步操作之前,藉由集群的機器數分配接收的批次資料。
如果運行在計算stage上的並發任務數不足夠大,就不會充分利用集群的資料來源。例如,對於分散式reduce操作如reduceByKey
和reduceByKeyAndWindow
,預設的並發任務數藉由配置属性來確定(configuration.html#spark-properties)
spark.default.parallelism
。你可以藉由參數(PairDStreamFunctions
(api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))傳遞平行度,或者設定參數
spark.default.parallelism
修改預設值。
資料序列化的總開销是平常大的,特别是當sub-second級的批次資料被接收時。下面有兩個相關點:
- Spark中RDD資料的序列化。關於資料序列化請参照Spark優化指南。注意,與Spark不同的是,預設的RDD會被持續化為序列化的位元組陣列,以減少與垃圾回收相關的暫停。
- 輸入資料的序列化。從外部獲取資料存到Spark中,獲取的byte資料需要從byte反序列化,然後再按照Spark的序列化格式重新序列化到Spark中。因此,輸入資料的反序列化花費可能是一個瓶頸。
每秒鐘啟動的任務數是非常大的(50或者更多)。發送任務到slave的花費明顯,這使請求很難獲得亞秒(sub-second)級别的反應。藉由下面的改變可以減小開支
- 任務序列化。運行kyro序列化任何可以減小任務的大小,從而減小任務發送到slave的時間。
- 執行模式。在Standalone模式下或者粗粒度的Mesos模式下運行Spark可以在比細粒度Mesos模式下運行Spark獲得更短的任務啟動時間。可以在在Mesos下運行Spark中獲取更多訊息。
These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.