TOP

Spark Streaming知識總結
2018-12-06 01:23:24 】 瀏覽:645
Tags:

Spark Streaming原理

Spark Streaming 是基于spark的流式批處理引擎。其基本原理是:將實時輸入數據流以時間片為單位進行拆分,然后經Spark引擎以類似批處理的方式處理每個時間片數據。
在這里插入圖片描述

Spark Streaming作業流程

在這里插入圖片描述

  • 客戶端提交作業后啟動Driver(Driver是spark作業的Master);
  • 每個作業包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task(可選的);
  • Receiver接收數據后生成Block,并把BlockId匯報給Driver优乐棋牌app下载,然后備份到另外一個Executor上;
  • ReceiverTracker維護Reciver匯報的BlockId;
  • Driver定時啟動JobGenerator,根據Dstream的關系生成邏輯RDD,然后創建Jobset,交給JobScheduler;
  • JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個task;
  • TaskScheduler負責把task調度到Executor上,并維護task的運行狀態;
  • 當tasks、stages、jobset完成后,單個batch才算完成。

Spark Streaming 與 Strom

聯系:
流式系統的特點:
低延遲。秒級或更短時間的響應
高性能
分布式
可擴展。伴隨著業務的發展,數據量、計算量可能會越來越大,所以要求系統是可擴展的
容錯。分布式系統中的通用問題,一個節點掛了不能影響應用

區別:
1、同一套系統,安裝spark之后就一切都有了
2、spark較強的容錯能力;strom使用較廣、更穩定
3、storm是用Clojure語言去寫的,它的很多擴展都是使用java完成的
4、任務執行方面和strom的區別是:
spark steaming數據進來是一小段時間的RDD,數據進來之后切成一小塊一小塊進行批處理
storm是基于record形式來的,進來的是一個tuple,一條進來就處理一下
5、中間過程實質上就是spark引擎,只不過sparkstreaming在spark之后引擎之上動了一點手腳:對進入spark引擎之前的數據進行了一個封裝,方便進行基于時間片的小批量作業,交給spark進行計算

離散數據流

Spark Streaming最主要的抽象是DStream(Discretized Stream,離散化數據流),表示連續不斷的數據流。
在內部實現上,Spark Streaming的輸入數據按照時間片(如1秒)分成一段一段,每一段數據轉換為Spark中的RDD,這些分段就是DStream,并且對DStream的操作都最終轉變為對相應的RDD的操作。
Spark Streaming提供了被稱為離散化流或者DStream的高層抽象,這個高層抽象用于表示數據的連續流;

創建DStream的方式:由文件、Socket、Kafka、Flume等取得的數據作為輸入數據流;或其他DStream進行的高層操作;

在內部,DStream被表達為RDDs的一個序列。
1、Dstream叫做離散數據流,是一個數據抽象,代表一個數據流。這個數據流可以從對輸入流的轉換獲得
2、Dstream是RDD在時間序列上的一個封裝
3、DStream的內部是通過一組時間序列上連續的RDD表示,每個都包含了特定時間間隔的數據流,RDD代表按照規定時間收集到的數據集
4、DStream這種數據流抽象也可以整體轉換,一個操作結束后轉換另外一種DStream
5、DStream的默認存儲級別為<內存+磁盤>
6、sparkstreaming有一種特別的操作:windows操作,稱為窗口操作,實質是對固定的以時間片積累起來的幾個RDD作為一整體操作
7、可以使用persist()函數進行序列化(KryoSerializer)

輸入輸出數據源

Spark Streaming可整合多種輸入數據源,如:
文件系統(本地文件、HDFS文件)
TCP套接字
Flume
Kafka
處理后的數據可存儲至文件系統、數據庫等系統中

Spark Streaming 讀取外部數據

在Spark Streaming中,有一個組件Receiver,作為一個長期運行的task跑在一個Executor上;

每個Receiver都會負責一個input DStream(比如從文件中讀取數據的文件流,比如套接字流,或者從Kafka中讀取的一個輸入流等等);

Spark Streaming通過input DStream與外部數據源進行連接,讀取相關數據。這項工作由Receiver完成。

Streaming 程序基本步驟

1、創建輸入DStream來定義輸入源

2、通過對DStream應用轉換操作和輸出操作來定義流計算

3、用streamingContext.start()來開始接收數據和處理流程;start之后不能再添加業務邏輯。

4、通過streamingContext.awaitTermination()方法來等待處理結束(手動結束或因為錯誤而結束)

5、可以通過streamingContext.stop()來手動結束流計算進程

StreamingContext 對象

StreamingContext 對象可以通過 SparkConf 對象創建;

不要硬編碼 master 參數在集群中, 而是通過 spark-submit 接收參數;

對于本地測試和單元測試, 可以傳遞“local[*]” 來運行 Spark Streaming 在進程內運行(自動檢測本地系統的CPU內核數量);

分批間隔時間基于應用延遲需求和可用的集群資源進行設定(設定間隔要大于應用數據的最小延遲需求,同時不能設置太小以至于系統無法在給定的周期內處理完畢)

其他問題

StreamingContext 對象也可以通過SparkContext對象創建。在context創建之后,可以接著開始如下的工作:
定義 input sources,通過創建 input Dstreams 完成
定義 streaming 計算,通過DStreams的 transformation 和 output 操作實現
啟動接收數據和處理,通過 streamingContext.start()
等待處理停止 (通常因為錯誤)优乐棋牌app下载,通過streamingContext.awaitTermination()
處理過程可以手動停止,通過 streamingContext.stop()

備注:
一旦context啟動, 沒有新的 streaming 計算可以被設置和添加進來
一旦context被停止, 它不能被再次啟動
只有一個StreamingContext在JVM中在同一時間可以被激活
StreamingContext.stop()執行時,同時停止了SparkContext

基本輸入源

文件流

1、文件必須是cp到指定的路徑中,不能是mv。新建文件也可以。
hdfs、本地文件系統都可以

2、文件流不需要運行接收器,可以不分配核數,即可以使用local[1],這是特例

Socket(套接字)流

Spark Streaming可以通過Socket端口監聽并接收數據,然后進行相應處理

編寫基于套接字的WordCount程序

新開一個命令窗口,啟動nc程序:
nc -lk 9999
(nc 需要安裝 yum install nc)

隨后可以在nc窗口中隨意輸入一些單詞,監聽窗口會自動獲得單詞數據流信息,在監聽窗口每隔x秒就會打印出詞頻統計信息,可以在屏幕上出現結果。

備注:使用local[]优乐棋牌app下载,可能存在問題。
如果給虛擬機配置的cpu數為1,使用local[
]也只會啟動一個線程,該線程用于receiver task,此時沒有資源處理接收達到的數據。
【現象:程序正常執行,不會打印時間戳,屏幕上也不會有其他有效信息】

有幾個問題:
日志信息太多,不爽,能改善嗎?
加入 setLogLevel

可以從別的機器發送字符串嗎,可以監聽別的機器的端口嗎?
nc –lk 9999
ssc.socketTextStream(“node1”, 9999)
nc命令只能將字符串發送到本地的端口;
streaming程序可以監聽其他機器的端口

每次都需要手動輸入字符串,實在不爽!能寫一個模仿nc的程序,向固定端口發送數據嗎?

RDD隊列流

調試Spark Streaming應用程序的時候,可使用streamingContext.
queueStream(queueOfRDD)創建基于RDD隊列的Dstream;

新建一個RDDQueueStream.scala代碼文件,功能是:每秒創建一個RDD,Streaming每隔5秒就對數據進行處理;

這種方式多用來測試streaming程序。
在這里插入圖片描述
備注:
oneAtATime:缺省為true,一次處理一個RDD,
設為false,一次處理全部RDD;
RDD隊列流可以使用local[1];
涉及到同時出隊和入隊操作,所以要做同步;

請關注公眾號獲取更多資料



】【打印繁體】【】【】 【】【】【】 【關閉】 【返回頂部
上一篇Spark將數據壓縮存儲 下一篇spark集群模式的部署