為了初始化Spark Streaming程序,一個(gè)StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。一個(gè)StreamingContext對象可以用SparkConf對象創(chuàng)建。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
表示你的應(yīng)用程序顯示在集群UI上的名字,master
是一個(gè)Spark、Mesos、YARN集群URL或者一個(gè)特殊字符串“l(fā)ocal[*]”,它表示程序用本地模式運(yùn)行。當(dāng)程序運(yùn)行在集群中時(shí),你并不希望在程序中硬編碼master
,而是希望用spark-submit
啟動(dòng)應(yīng)用程序,并從spark-submit
中得到master
的值。對于本地測試或者單元測試,你可以傳遞“l(fā)ocal”字符串在同一個(gè)進(jìn)程內(nèi)運(yùn)行Spark Streaming。需要注意的是,它在內(nèi)部創(chuàng)建了一個(gè)SparkContext對象,你可以通過ssc.sparkContext
訪問這個(gè)SparkContext對象。
批時(shí)間片需要根據(jù)你的程序的潛在需求以及集群的可用資源來設(shè)定,你可以在性能調(diào)優(yōu)那一節(jié)獲取詳細(xì)的信息。
可以利用已經(jīng)存在的SparkContext
對象創(chuàng)建StreamingContext
對象。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
當(dāng)一個(gè)上下文(context)定義之后,你必須按照以下幾步進(jìn)行操作
streamingContext.start()
方法接收和處理數(shù)據(jù);streamingContext.stop()
方法被調(diào)用。幾點(diǎn)需要注意的地方:
stop()
方法,也會(huì)關(guān)閉SparkContext對象。如果只想僅關(guān)閉StreamingContext對象,設(shè)置stop()
的可選參數(shù)為false
更多建議: