對于大數(shù)據(jù),我們要考慮的問題有很多,首先海量數(shù)據(jù)如何收集(如 Flume),然后對于收集到的數(shù)據(jù)如何存儲(典型的分布式文件系統(tǒng) HDFS、分布式數(shù)據(jù)庫 HBase、NoSQL 數(shù)據(jù)庫 Redis),其次存儲的數(shù)據(jù)不是存起來就沒事了,要通過計算從中獲取有用的信息,這就涉及到計算模型(典型的離線計算 MapReduce、流式實時計算Storm、Spark),或者要從數(shù)據(jù)中挖掘信息,還需要相應的機器學習算法。在這些之上,還有一些各種各樣的查詢分析數(shù)據(jù)的工具(如 Hive、Pig 等)。除此之外,要構建分布式應用還需要一些工具,比如分布式協(xié)調(diào)服務 Zookeeper 等等。
??這里,我們講到的是消息系統(tǒng),Kafka 專為分布式高吞吐量系統(tǒng)而設計,其他消息傳遞系統(tǒng)相比,Kafka 具有更好的吞吐量,內(nèi)置分區(qū),復制和固有的容錯能力,這使得它非常適合大規(guī)模消息處理應用程序。
??首先,我們理解一下什么是消息系統(tǒng):消息系統(tǒng)負責將數(shù)據(jù)從一個應用程序傳輸?shù)搅硗庖粋€應用程序,使得應用程序可以專注于處理邏輯,而不用過多的考慮如何將消息共享出去。
??分布式消息系統(tǒng)基于可靠消息隊列的方式,消息在應用程序和消息系統(tǒng)之間異步排隊。實際上,消息系統(tǒng)有兩種消息傳遞模式:一種是點對點,另外一種是基于發(fā)布-訂閱(publish-subscribe)的消息系統(tǒng)。
??在點對點的消息系統(tǒng)中,消息保留在隊列中,一個或者多個消費者可以消耗隊列中的消息,但是消息最多只能被一個消費者消費,一旦有一個消費者將其消費掉,消息就從該隊列中消失。這里要注意:多個消費者可以同時工作,但是最終能拿到該消息的只有其中一個。最典型的例子就是訂單處理系統(tǒng),多個訂單處理器可以同時工作,但是對于一個特定的訂單,只有其中一個訂單處理器可以拿到該訂單進行處理。
??在發(fā)布 - 訂閱系統(tǒng)中,消息被保留在主題中。 與點對點系統(tǒng)不同,消費者可以訂閱一個或多個主題并使用該主題中的所有消息。在發(fā)布 - 訂閱系統(tǒng)中,消息生產(chǎn)者稱為發(fā)布者,消息使用者稱為訂閱者。 一個現(xiàn)實生活的例子是Dish電視,它發(fā)布不同的渠道,如運動,電影,音樂等,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時可用。
??Kafka is a distributed,partitioned,replicated commit logservice。
??Apache Kafka 是一個分布式發(fā)布 - 訂閱消息系統(tǒng)和一個強大的隊列,可以處理大量的數(shù)據(jù),并使你能夠?qū)⑾囊粋€端點傳遞到另一個端點。 Kafka 適合離線和在線消息消費。 Kafka 消息保留在磁盤上,并在群集內(nèi)復制以防止數(shù)據(jù)丟失。 Kafka 構建在 ZooKeeper 同步服務之上。 它與 Apache Storm 和 Spark 非常好地集成,用于實時流式數(shù)據(jù)分析。
??Kafka 是一個分布式消息隊列,具有高性能、持久化、多副本備份、橫向擴展能力。生產(chǎn)者往隊列里寫消息,消費者從隊列里取消息進行業(yè)務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。
??關鍵術語:
??(1)生產(chǎn)者和消費者(producer和consumer):消息的發(fā)送者叫 Producer,消息的使用者和接受者是 Consumer,生產(chǎn)者將數(shù)據(jù)保存到 Kafka 集群中,消費者從中獲取消息進行業(yè)務的處理。
??(2)broker:Kafka 集群中有很多臺 Server,其中每一臺 Server 都可以存儲消息,將每一臺 Server 稱為一個 kafka 實例,也叫做 broker。
??(3)主題(topic):一個 topic 里保存的是同一類消息,相當于對消息的分類,每個 producer 將消息發(fā)送到 kafka 中,都需要指明要存的 topic 是哪個,也就是指明這個消息屬于哪一類。
??(4)分區(qū)(partition):每個 topic 都可以分成多個 partition,每個 partition 在存儲層面是 append log 文件。任何發(fā)布到此 partition 的消息都會被直接追加到 log 文件的尾部。為什么要進行分區(qū)呢?最根本的原因就是:kafka基于文件進行存儲,當文件內(nèi)容大到一定程度時,很容易達到單個磁盤的上限,因此,采用分區(qū)的辦法,一個分區(qū)對應一個文件,這樣就可以將數(shù)據(jù)分別存儲到不同的server上去,另外這樣做也可以負載均衡,容納更多的消費者。
??(5)偏移量(Offset):一個分區(qū)對應一個磁盤上的文件,而消息在文件中的位置就稱為 offset(偏移量),offset 為一個 long 型數(shù)字,它可以唯一標記一條消息。由于kafka 并沒有提供其他額外的索引機制來存儲 offset,文件只能順序的讀寫,所以在kafka中幾乎不允許對消息進行“隨機讀寫”。
??綜上,我們總結一下 Kafka 的幾個要點:
??通過之前的介紹,我們對 kafka 有了一個簡單的理解,它的設計初衷是建立一個統(tǒng)一的信息收集平臺,使其可以做到對信息的實時反饋。Kafka is a distributed,partitioned,replicated commit logservice。接下來我們著重從幾個方面分析其基本原理。
??我們說 kafka 是一個分布式消息系統(tǒng),所謂的分布式,實際上我們已經(jīng)大致了解。消息保存在 Topic 中,而為了能夠?qū)崿F(xiàn)大數(shù)據(jù)的存儲,一個 topic 劃分為多個分區(qū),每個分區(qū)對應一個文件,可以分別存儲到不同的機器上,以實現(xiàn)分布式的集群存儲。另外,每個 partition 可以有一定的副本,備份到多臺機器上,以提高可用性。
??總結起來就是:一個 topic 對應的多個 partition 分散存儲到集群中的多個 broker 上,存儲方式是一個 partition 對應一個文件,每個 broker 負責存儲在自己機器上的 partition 中的消息讀寫。
??kafka 還可以配置 partitions 需要備份的個數(shù)(replicas),每個 partition 將會被備份到多臺機器上,以提高可用性,備份的數(shù)量可以通過配置文件指定。
??這種冗余備份的方式在分布式系統(tǒng)中是很常見的,那么既然有副本,就涉及到對同一個文件的多個備份如何進行管理和調(diào)度。kafka 采取的方案是:每個 partition 選舉一個 server 作為“l(fā)eader”,由 leader 負責所有對該分區(qū)的讀寫,其他 server 作為 follower 只需要簡單的與 leader 同步,保持跟進即可。如果原來的 leader 失效,會重新選舉由其他的 follower 來成為新的 leader。
??至于如何選取 leader,實際上如果我們了解 ZooKeeper,就會發(fā)現(xiàn)其實這正是 Zookeeper 所擅長的,Kafka 使用 ZK 在 Broker 中選出一個 Controller,用于 Partition 分配和 Leader 選舉。
??另外,這里我們可以看到,實際上作為 leader 的 server 承擔了該分區(qū)所有的讀寫請求,因此其壓力是比較大的,從整體考慮,有多少個 partition 就意味著會有多少個leader,kafka 會將 leader 分散到不同的 broker 上,確保整體的負載均衡。
??Kafka 的總體數(shù)據(jù)流滿足下圖,該圖可以說是概括了整個 kafka 的基本原理。
(1)數(shù)據(jù)生產(chǎn)過程(Produce)
??對于生產(chǎn)者要寫入的一條記錄,可以指定四個參數(shù):分別是 topic、partition、key 和 value,其中 topic 和 value(要寫入的數(shù)據(jù))是必須要指定的,而 key 和 partition 是可選的。
??對于一條記錄,先對其進行序列化,然后按照 Topic 和 Partition,放進對應的發(fā)送隊列中。如果 Partition 沒填,那么情況會是這樣的:a、Key 有填。按照 Key 進行哈希,相同 Key 去一個 Partition。b、Key 沒填。Round-Robin 來選 Partition。
??producer 將會和Topic下所有 partition leader 保持 socket 連接,消息由 producer 直接通過 socket 發(fā)送到 broker。其中 partition leader 的位置( host : port )注冊在 zookeeper 中,producer 作為 zookeeper client,已經(jīng)注冊了 watch 用來監(jiān)聽 partition leader 的變更事件,因此,可以準確的知道誰是當前的 leader。
??producer 端采用異步發(fā)送:將多條消息暫且在客戶端 buffer 起來,并將他們批量的發(fā)送到 broker,小數(shù)據(jù) IO 太多,會拖慢整體的網(wǎng)絡延遲,批量延遲發(fā)送事實上提升了網(wǎng)絡效率。
(2)數(shù)據(jù)消費過程(Consume)
??對于消費者,不是以單獨的形式存在的,每一個消費者屬于一個 consumer group,一個 group 包含多個 consumer。特別需要注意的是:訂閱 Topic 是以一個消費組來訂閱的,發(fā)送到 Topic 的消息,只會被訂閱此 Topic 的每個 group 中的一個 consumer 消費。
??如果所有的 Consumer 都具有相同的 group,那么就像是一個點對點的消息系統(tǒng);如果每個 consumer 都具有不同的 group,那么消息會廣播給所有的消費者。
??具體說來,這實際上是根據(jù) partition 來分的,一個 Partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費,消費組里的每個消費者是關聯(lián)到一個 partition 的,因此有這樣的說法:對于一個 topic,同一個 group 中不能有多于 partitions 個數(shù)的 consumer 同時消費,否則將意味著某些 consumer 將無法得到消息。
??同一個消費組的兩個消費者不會同時消費一個 partition。
??在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立連接之后,主動去 pull(或者說 fetch )消息,首先 consumer 端可以根據(jù)自己的消費能力適時的去 fetch 消息并處理,且可以控制消息消費的進度(offset)。
??partition 中的消息只有一個 consumer 在消費,且不存在消息狀態(tài)的控制,也沒有復雜的消息確認機制,可見 kafka broker 端是相當輕量級的。當消息被 consumer 接收之后,需要保存 Offset 記錄消費到哪,以前保存在 ZK 中,由于 ZK 的寫性能不好,以前的解決方法都是 Consumer 每隔一分鐘上報一次,在 0.10 版本后,Kafka 把這個 Offset 的保存,從 ZK 中剝離,保存在一個名叫 consumeroffsets topic 的 Topic 中,由此可見,consumer 客戶端也很輕量級。
??Kafka 支持 3 種消息投遞語義,在業(yè)務中,常常都是使用 At least once 的模型。
更多建議: