輸出操作允許DStream的操作推到如數(shù)據(jù)庫(kù)、文件系統(tǒng)等外部系統(tǒng)中。因?yàn)檩敵霾僮鲗?shí)際上是允許外部系統(tǒng)消費(fèi)轉(zhuǎn)換后的數(shù)據(jù),它們觸發(fā)的實(shí)際操作是DStream轉(zhuǎn)換。目前,定義了下面幾種輸出操作:
Output Operation | Meaning |
---|---|
print() | 在DStream的每個(gè)批數(shù)據(jù)中打印前10條元素,這個(gè)操作在開(kāi)發(fā)和調(diào)試中都非常有用。在Python API中調(diào)用pprint() 。 |
saveAsObjectFiles(prefix, [suffix]) | 保存DStream的內(nèi)容為一個(gè)序列化的文件SequenceFile 。每一個(gè)批間隔的文件的文件名基于prefix 和suffix 生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 |
saveAsTextFiles(prefix, [suffix]) | 保存DStream的內(nèi)容為一個(gè)文本文件。每一個(gè)批間隔的文件的文件名基于prefix 和suffix 生成。"prefix-TIME_IN_MS[.suffix]" |
saveAsHadoopFiles(prefix, [suffix]) | 保存DStream的內(nèi)容為一個(gè)hadoop文件。每一個(gè)批間隔的文件的文件名基于prefix 和suffix 生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 |
foreachRDD(func) | 在從流中生成的每個(gè)RDD上應(yīng)用函數(shù)func 的最通用的輸出操作。這個(gè)函數(shù)應(yīng)該推送每個(gè)RDD的數(shù)據(jù)到外部系統(tǒng),例如保存RDD到文件或者通過(guò)網(wǎng)絡(luò)寫到數(shù)據(jù)庫(kù)中。需要注意的是,func 函數(shù)在驅(qū)動(dòng)程序中執(zhí)行,并且通常都有RDD action在里面推動(dòng)RDD流的計(jì)算。 |
dstream.foreachRDD是一個(gè)強(qiáng)大的原語(yǔ),發(fā)送數(shù)據(jù)到外部系統(tǒng)中。然而,明白怎樣正確地、有效地用這個(gè)原語(yǔ)是非常重要的。下面幾點(diǎn)介紹了如何避免一般錯(cuò)誤。
dstream.foreachRDD(rdd => {
val connection = createNewConnection() // executed at the driver
rdd.foreach(record => {
connection.send(record) // executed at the worker
})
})
這是不正確的,因?yàn)檫@需要先序列化連接對(duì)象,然后將它從driver發(fā)送到worker中。這樣的連接對(duì)象在機(jī)器之間不能傳送。它可能表現(xiàn)為序列化錯(cuò)誤(連接對(duì)象不可序列化)或者初始化錯(cuò)誤(連接對(duì)象應(yīng)該在worker中初始化)等等。正確的解決辦法是在worker中創(chuàng)建連接對(duì)象。
dstream.foreachRDD(rdd => {
rdd.foreach(record => {
val connection = createNewConnection()
connection.send(record)
connection.close()
})
})
通常,創(chuàng)建一個(gè)連接對(duì)象有資源和時(shí)間的開(kāi)支。因此,為每個(gè)記錄創(chuàng)建和銷毀連接對(duì)象會(huì)導(dǎo)致非常高的開(kāi)支,明顯的減少系統(tǒng)的整體吞吐量。一個(gè)更好的解決辦法是利用rdd.foreachPartition
方法。為RDD的partition創(chuàng)建一個(gè)連接對(duì)象,用這個(gè)兩件對(duì)象發(fā)送partition中的所有記錄。
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
})
})
這就將連接對(duì)象的創(chuàng)建開(kāi)銷分?jǐn)偟搅藀artition的所有記錄上了。
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})
需要注意的是,池中的連接對(duì)象應(yīng)該根據(jù)需要延遲創(chuàng)建,并且在空閑一段時(shí)間后自動(dòng)超時(shí)。這樣就獲取了最有效的方式發(fā)生數(shù)據(jù)到外部系統(tǒng)。
其它需要注意的地方:
dstream.foreachRDD()
,但是沒(méi)有任何RDD action操作在dstream.foreachRDD()
里面,那么什么也不會(huì)執(zhí)行。系統(tǒng)僅僅會(huì)接收輸入,然后丟棄它們。
更多建議: