初始化StreamingContext

2018-02-24 15:57 更新

初始化StreamingContext

為了初始化Spark Streaming程序,一個(gè)StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。一個(gè)StreamingContext對象可以用SparkConf對象創(chuàng)建。

import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName表示你的應(yīng)用程序顯示在集群UI上的名字,master是一個(gè)Spark、Mesos、YARN集群URL或者一個(gè)特殊字符串“l(fā)ocal[*]”,它表示程序用本地模式運(yùn)行。當(dāng)程序運(yùn)行在集群中時(shí),你并不希望在程序中硬編碼master,而是希望用spark-submit啟動(dòng)應(yīng)用程序,并從spark-submit中得到master的值。對于本地測試或者單元測試,你可以傳遞“l(fā)ocal”字符串在同一個(gè)進(jìn)程內(nèi)運(yùn)行Spark Streaming。需要注意的是,它在內(nèi)部創(chuàng)建了一個(gè)SparkContext對象,你可以通過ssc.sparkContext訪問這個(gè)SparkContext對象。

批時(shí)間片需要根據(jù)你的程序的潛在需求以及集群的可用資源來設(shè)定,你可以在性能調(diào)優(yōu)那一節(jié)獲取詳細(xì)的信息。

可以利用已經(jīng)存在的SparkContext對象創(chuàng)建StreamingContext對象。

import org.apache.spark.streaming._
val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

當(dāng)一個(gè)上下文(context)定義之后,你必須按照以下幾步進(jìn)行操作

  • 定義輸入源;
  • 準(zhǔn)備好流計(jì)算指令;
  • 利用streamingContext.start()方法接收和處理數(shù)據(jù);
  • 處理過程將一直持續(xù),直到streamingContext.stop()方法被調(diào)用。

幾點(diǎn)需要注意的地方:

  • 一旦一個(gè)context已經(jīng)啟動(dòng),就不能有新的流算子建立或者是添加到context中。
  • 一旦一個(gè)context已經(jīng)停止,它就不能再重新啟動(dòng)
  • 在JVM中,同一時(shí)間只能有一個(gè)StreamingContext處于活躍狀態(tài)
  • 在StreamingContext上調(diào)用stop()方法,也會(huì)關(guān)閉SparkContext對象。如果只想僅關(guān)閉StreamingContext對象,設(shè)置stop()的可選參數(shù)為false
  • 一個(gè)SparkContext對象可以重復(fù)利用去創(chuàng)建多個(gè)StreamingContext對象,前提條件是前面的StreamingContext在后面StreamingContext創(chuàng)建之前關(guān)閉(不關(guān)閉SparkContext)。
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)