Apache Kafka 整合 Storm

2018-12-28 18:26 更新

在本章中,我們將學(xué)習(xí)如何將Kafka與Apache Storm集成。

關(guān)于Storm

Storm最初由Nathan Marz和BackType的團(tuán)隊(duì)創(chuàng)建。 在短時(shí)間內(nèi),Apache Storm成為分布式實(shí)時(shí)處理系統(tǒng)的標(biāo)準(zhǔn),允許您處理大量數(shù)據(jù)。 Storm是非常快的,并且一個(gè)基準(zhǔn)時(shí)鐘為每個(gè)節(jié)點(diǎn)每秒處理超過(guò)一百萬(wàn)個(gè)元組。 Apache Storm持續(xù)運(yùn)行,從配置的源(Spouts)消耗數(shù)據(jù),并將數(shù)據(jù)傳遞到處理管道(Bolts)。 聯(lián)合,Spouts和Bolt構(gòu)成一個(gè)拓?fù)洹?/span>

與Storm集成

Kafka和Storm自然互補(bǔ),它們強(qiáng)大的合作能夠?qū)崿F(xiàn)快速移動(dòng)的大數(shù)據(jù)的實(shí)時(shí)流分析。 Kafka和Storm集成是為了使開發(fā)人員更容易地從Storm拓?fù)浍@取和發(fā)布數(shù)據(jù)流。

概念流

Spouts是流的源。 例如,一個(gè)噴頭可以從Kafka Topic讀取元組并將它們作為流發(fā)送。 Bolt消耗輸入流,處理并可能發(fā)射新的流。 Bolt可以從運(yùn)行函數(shù),過(guò)濾元組,執(zhí)行流聚合,流連接,與數(shù)據(jù)庫(kù)交談等等做任何事情。 Storm拓?fù)渲械拿總€(gè)節(jié)點(diǎn)并行執(zhí)行。 拓?fù)錈o(wú)限運(yùn)行,直到終止它。 Storm將自動(dòng)重新分配任何失敗的任務(wù)。 此外,Storm保證沒(méi)有數(shù)據(jù)丟失,即使機(jī)器停機(jī)和消息被丟棄。

讓我們?cè)敿?xì)了解Kafka-Storm集成API。 有三個(gè)主要類集成Kafka與Storm。 他們?nèi)缦?-

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts是一個(gè)接口,ZkHosts和StaticHosts是它的兩個(gè)主要實(shí)現(xiàn)。 ZkHosts用于通過(guò)在ZooKeeper中維護(hù)細(xì)節(jié)來(lái)動(dòng)態(tài)跟蹤Kafka代理,而StaticHosts用于手動(dòng)/靜態(tài)設(shè)置Kafka代理及其詳細(xì)信息。 ZkHosts是訪問(wèn)Kafka代理的簡(jiǎn)單快捷的方式。

ZkHosts的簽名如下 -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

其中brokerZkStr是ZooKeeper主機(jī),brokerZkPath是ZooKeeper路徑以維護(hù)Kafka代理詳細(xì)信息。

KafkaConfig API

此API用于定義Kafka集群的配置設(shè)置。 Kafka Con-fig的簽名定義如下

public KafkaConfig(BrokerHosts hosts, string topic)

    主機(jī) - BrokerHosts可以是ZkHosts / StaticHosts。

    主題 - 主題名稱。

SpoutConfig API

Spoutconfig是KafkaConfig的擴(kuò)展,支持額外的ZooKeeper信息。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • 主機(jī) - BrokerHosts可以是BrokerHosts接口的任何實(shí)現(xiàn)

  • 主題 - 主題名稱。

  • zkRoot - ZooKeeper根路徑。

  • id - spouts存儲(chǔ)在Zookeeper中消耗的偏移量的狀態(tài)。 ID應(yīng)該唯一標(biāo)識(shí)您的噴嘴。

SchemeAsMultiScheme

SchemeAsMultiScheme是一個(gè)接口,用于指示如何將從Kafka中消耗的ByteBuffer轉(zhuǎn)換為風(fēng)暴元組。 它源自MultiScheme并接受Scheme類的實(shí)現(xiàn)。 有很多Scheme類的實(shí)現(xiàn),一個(gè)這樣的實(shí)現(xiàn)是StringScheme,它將字節(jié)解析為一個(gè)簡(jiǎn)單的字符串。 它還控制輸出字段的命名。 簽名定義如下。

public SchemeAsMultiScheme(Scheme scheme)
  • 方案 - 從kafka消耗的字節(jié)緩沖區(qū)。

KafkaSpout API

KafkaSpout是我們的spout實(shí)現(xiàn),它將與Storm集成。 它從kafka主題獲取消息,并將其作為元組發(fā)送到Storm生態(tài)系統(tǒng)。 KafkaSpout從SpoutConfig獲取其配置詳細(xì)信息。

下面是一個(gè)創(chuàng)建一個(gè)簡(jiǎn)單的Kafka噴水嘴的示例代碼。

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

創(chuàng)建Bolt

Bolt是一個(gè)使用元組作為輸入,處理元組,并產(chǎn)生新的元組作為輸出的組件。 Bolt將實(shí)現(xiàn)IRichBolt接口。 在此程序中,使用兩個(gè)Bolt類WordSplitter-Bolt和WordCounterBolt來(lái)執(zhí)行操作。

IRichBolt接口有以下方法 -

  • 準(zhǔn)備 - 為Bolt提供要執(zhí)行的環(huán)境。 執(zhí)行器將運(yùn)行此方法來(lái)初始化噴頭。

  • 執(zhí)行 - 處理單個(gè)元組的輸入。

  • 清理 - 當(dāng)Bolt要關(guān)閉時(shí)調(diào)用。

  • declareOutputFields - 聲明元組的輸出模式。

讓我們創(chuàng)建SplitBolt.java,它實(shí)現(xiàn)邏輯分割一個(gè)句子到詞和CountBolt.java,它實(shí)現(xiàn)邏輯分離獨(dú)特的單詞和計(jì)數(shù)其出現(xiàn)。

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()&plus;" : " &plus; entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

提交拓?fù)?/span>

Storm拓?fù)浠旧鲜且粋€(gè)Thrift結(jié)構(gòu)。 TopologyBuilder類提供了簡(jiǎn)單而容易的方法來(lái)創(chuàng)建復(fù)雜的拓?fù)洹?/span> TopologyBuilder類具有設(shè)置spout(setSpout)和設(shè)置bolt(setBolt)的方法。 最后,TopologyBuilder有createTopology來(lái)創(chuàng)建to-pology。 shuffleGrouping和fieldsGrouping方法有助于為噴頭和Bolt設(shè)置流分組。

本地集群 - 為了開發(fā)目的,我們可以使用 LocalCluster 對(duì)象創(chuàng)建本地集群,然后使用 LocalCluster的 submitTopology 類。

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

在移動(dòng)編譯之前,Kakfa-Storm集成需要策展人ZooKeeper客戶端java庫(kù)。 策展人版本2.9.1支持Apache Storm 0.9.5版(我們?cè)诒窘坛讨惺褂?。 下載下面指定的jar文件并將其放在java類路徑中。

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

在包括依賴文件之后,使用以下命令編譯程序,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

執(zhí)行

啟動(dòng)Kafka Producer CLI(在上一章節(jié)中解釋),創(chuàng)建一個(gè)名為 my-first-topic 的新主題,并提供一些樣本消息,如下所示 -

hello
kafka
storm
spark
test message
another test message

現(xiàn)在使用以下命令執(zhí)行應(yīng)用程序 -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*":. KafkaStormSample

此應(yīng)用程序的示例輸出如下所示 -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)