在本章中,我們將討論如何將Apache Kafka與Spark Streaming API集成。
Spark Streaming API支持實時數(shù)據(jù)流的可擴展,高吞吐量,容錯流處理。 數(shù)據(jù)可以從諸如Kafka,F(xiàn)lume,Twitter等許多源中提取,并且可以使用復雜的算法來處理,例如地圖,縮小,連接和窗口等高級功能。 最后,處理的數(shù)據(jù)可以推送到文件系統(tǒng),數(shù)據(jù)庫和活動儀表板。 彈性分布式數(shù)據(jù)集(RDD)是Spark的基本數(shù)據(jù)結構。 它是一個不可變的分布式對象集合。 RDD中的每個數(shù)據(jù)集劃分為邏輯分區(qū),可以在集群的不同節(jié)點上計算。
Kafka是Spark流式傳輸?shù)臐撛谙鬟f和集成平臺。 Kafka充當實時數(shù)據(jù)流的中心樞紐,并使用Spark Streaming中的復雜算法進行處理。 一旦數(shù)據(jù)被處理,Spark Streaming可以將結果發(fā)布到另一個Kafka主題或存儲在HDFS,數(shù)據(jù)庫或儀表板中。 下圖描述了概念流程。
現(xiàn)在,讓我們詳細了解Kafka-Spark API。
它表示Spark應用程序的配置。 用于將各種Spark參數(shù)設置為鍵值對。
SparkConf
類有以下方法 -
set(string key,string value) - 設置配置變量。
remove(string key) - 從配置中移除密鑰。
setAppName(string name) - 設置應用程序的應用程序名稱。
get(string key) - get key
這是Spark功能的主要入口點。 SparkContext表示到Spark集群的連接,可用于在集群上創(chuàng)建RDD,累加器和廣播變量。 簽名的定義如下所示。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
主 - 要連接的群集網(wǎng)址(例如mesos:// host:port,spark:// host:port,local [4])。
appName - 作業(yè)的名稱,以顯示在集群Web UI上
batchDuration - 流式數(shù)據(jù)將被分成批次的時間間隔
public StreamingContext(SparkConf conf, Duration batchDuration)
通過提供新的SparkContext所需的配置創(chuàng)建StreamingContext。
conf - Spark參數(shù)
batchDuration - 流式數(shù)據(jù)將被分成批次的時間間隔
KafkaUtils API用于將Kafka集群連接到Spark流。 此API具有如下定義的顯著方法 createStream
。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上面顯示的方法用于創(chuàng)建從Kafka Brokers提取消息的輸入流。
ssc - StreamingContext對象。
zkQuorum - Zookeeper quorum。
groupId - 此消費者的組ID。
主題 - 返回要消費的主題的地圖。
storageLevel - 用于存儲接收的對象的存儲級別。
KafkaUtils API有另一個方法createDirectStream,用于創(chuàng)建一個輸入流,直接從Kafka Brokers拉取消息,而不使用任何接收器。 這個流可以保證來自Kafka的每個消息都包含在轉換中一次。
示例應用程序在Scala中完成。 要編譯應用程序,請下載并安裝 sbt
,scala構建工具(類似于maven)。 主要應用程序代碼如下所示。
import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
spark-kafka集成取決于Spark,Spark流和Spark與Kafka的集成jar。 創(chuàng)建一個新文件 build.sbt
,并指定應用程序詳細信息及其依賴關系。 在編譯和打包應用程序時, sbt
將下載所需的jar。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
運行以下命令以編譯和打包應用程序的jar文件。 我們需要將jar文件提交到spark控制臺以運行應用程序。
sbt package
啟動Kafka Producer CLI(在上一章中解釋),創(chuàng)建一個名為 my-first-topic
的新主題,并提供一些樣本消息,如下所示。
Another spark test message
運行以下命令將應用程序提交到spark控制臺。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
此應用程序的示例輸出如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message ..
更多建議: