Apache Kafka 簡(jiǎn)單生產(chǎn)者示例

2021-08-16 17:20 更新

讓我們使用Java客戶(hù)端創(chuàng)建一個(gè)用于發(fā)布和使用消息的應(yīng)用程序。 Kafka生產(chǎn)者客戶(hù)端包括以下API。

KafkaProducer API

讓我們了解本節(jié)中最重要的一組Kafka生產(chǎn)者API。 KafkaProducer API的中心部分是 KafkaProducer 類(lèi)。 KafkaProducer類(lèi)提供了一個(gè)選項(xiàng),用于將其構(gòu)造函數(shù)中的Kafka代理連接到以下方法。

  • KafkaProducer類(lèi)提供send方法以異步方式將消息發(fā)送到主題。 send()的簽名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - 生產(chǎn)者管理等待發(fā)送的記錄的緩沖區(qū)。

  • 回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時(shí)執(zhí)行的用戶(hù)提供的回調(diào)(null表示無(wú)回調(diào))。

  • KafkaProducer類(lèi)提供了一個(gè)flush方法,以確保所有先前發(fā)送的消息都已實(shí)際完成。 flush方法的語(yǔ)法如下 -

public void flush()
  • KafkaProducer類(lèi)提供了partitionFor方法,這有助于獲取給定主題的分區(qū)元數(shù)據(jù)。 這可以用于自定義分區(qū)。 這種方法的簽名如下 -

public Map metrics()

它返回由生產(chǎn)者維護(hù)的內(nèi)部度量的映射。

  • public void close() - KafkaProducer類(lèi)提供關(guān)閉方法塊,直到所有先前發(fā)送的請(qǐng)求完成。

生產(chǎn)者API

生產(chǎn)者API的中心部分是生產(chǎn)者類(lèi)。 生產(chǎn)者類(lèi)提供了一個(gè)選項(xiàng),通過(guò)以下方法在其構(gòu)造函數(shù)中連接Kafka代理。

生產(chǎn)者類(lèi)

生產(chǎn)者類(lèi)提供send方法以使用以下簽名向單個(gè)或多個(gè)主題發(fā)送消息。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,"async")
ProducerConfig config = new ProducerConfig(prop);

有兩種類(lèi)型的生產(chǎn)者 - 同步異步。

相同的API配置也適用于同步生產(chǎn)者。 它們之間的區(qū)別是同步生成器直接發(fā)送消息,但在后臺(tái)發(fā)送消息。 當(dāng)您想要更高的吞吐量時(shí),異步生產(chǎn)者是首選。 在以前的版本,如0.8,一個(gè)異步生產(chǎn)者沒(méi)有回調(diào)send()注冊(cè)錯(cuò)誤處理程序。 這僅在當(dāng)前版本0.9中可用。

public void close()

生產(chǎn)者類(lèi)提供關(guān)閉方法以關(guān)閉與所有Kafka代理的生產(chǎn)者池連接。

配置設(shè)置

下表列出了Producer API的主要配置設(shè)置,以便更好地理解 -

S.No配置設(shè)置和說(shuō)明
1

client.id

標(biāo)識(shí)生產(chǎn)者應(yīng)用程序

2

producer.type

同步或異步

3

acks

acks配置控制生產(chǎn)者請(qǐng)求下的標(biāo)準(zhǔn)是完全的。

4

重試

如果生產(chǎn)者請(qǐng)求失敗,則使用特定值自動(dòng)重試。

5

bootstrapping代理列表。

6

linger.ms

如果你想減少請(qǐng)求的數(shù)量,你可以將linger.ms設(shè)置為大于某個(gè)值的東西。

7

key.serializer

序列化器接口的鍵。

8

value.serializer

值。

9

batch.size

緩沖區(qū)大小。

10

buffer.memory

控制生產(chǎn)者可用于緩沖的存儲(chǔ)器的總量。

ProducerRecord API

ProducerRecord是發(fā)送到Kafka cluster.ProducerRecord類(lèi)構(gòu)造函數(shù)的鍵/值對(duì),用于使用以下簽名創(chuàng)建具有分區(qū),鍵和值對(duì)的記錄。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主題 - 將附加到記錄的用戶(hù)定義的主題名稱(chēng)。

  • 分區(qū) - 分區(qū)計(jì)數(shù)。

  • - 將包含在記錄中的鍵。

  • 值 記錄內(nèi)容。
public ProducerRecord (string topic, k key, v value)

ProducerRecord類(lèi)構(gòu)造函數(shù)用于創(chuàng)建帶有鍵,值對(duì)和無(wú)分區(qū)的記錄。

  • 主題 - 創(chuàng)建主題以分配記錄。

  • - 記錄的鍵。

  • - 記錄內(nèi)容。

public ProducerRecord (string topic, v value)

ProducerRecord類(lèi)創(chuàng)建一個(gè)沒(méi)有分區(qū)和鍵的記錄。

  • 主題 - 創(chuàng)建主題。

  • - 記錄內(nèi)容。

ProducerRecord類(lèi)方法列在下表中 -

S.No類(lèi)方法和描述
1

public string topic()

主題將附加到記錄。

2

public K key()

將包括在記錄中的鍵。 如果沒(méi)有這樣的鍵,null將在這里重新打開(kāi)。

3

public V value()

記錄內(nèi)容。

4

partition()

記錄的分區(qū)計(jì)數(shù)

SimpleProducer應(yīng)用程序

在創(chuàng)建應(yīng)用程序之前,首先啟動(dòng)ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。 之后,創(chuàng)建一個(gè)名為 Sim-pleProducer.java 的java類(lèi),然后鍵入以下代碼。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “l(fā)ocalhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully");
               producer.close();
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>

輸出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

簡(jiǎn)單消費(fèi)者示例

到目前為止,我們已經(jīng)創(chuàng)建了一個(gè)發(fā)送消息到Kafka集群的生產(chǎn)者。 現(xiàn)在讓我們創(chuàng)建一個(gè)消費(fèi)者來(lái)消費(fèi)Kafka集群的消息。 KafkaConsumer API用于消費(fèi)來(lái)自Kafka集群的消息。 KafkaConsumer類(lèi)的構(gòu)造函數(shù)定義如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消費(fèi)者配置的地圖。

KafkaConsumer類(lèi)具有下表中列出的以下重要方法。

S.No方法和說(shuō)明
1

public java.util.Set< TopicPar- tition> assignment()

獲取由用戶(hù)當(dāng)前分配的分區(qū)集。

2

public string subscription()

訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。

3

public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)

訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。

4

public void unsubscribe()

從給定的分區(qū)列表中取消訂閱主題。

5

public void sub-scribe(java.util.List< java.lang.String> topics)

訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。 如果給定的主題列表為空,則將其視為與unsubscribe()相同。

6

public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)

參數(shù)模式以正則表達(dá)式的格式引用預(yù)訂模式,而偵聽(tīng)器參數(shù)從預(yù)訂模式獲取通知。

7

public void as-sign(java.util.List< TopicPartion> partitions)

向客戶(hù)手動(dòng)分配分區(qū)列表。

8

poll()

使用預(yù)訂/分配API之一獲取指定的主題或分區(qū)的數(shù)據(jù)。 如果在輪詢(xún)數(shù)據(jù)之前未預(yù)訂主題,這將返回錯(cuò)誤。

9

public void commitSync()

提交對(duì)主題和分區(qū)的所有子編制列表的最后一次poll()返回的提交偏移量。 相同的操作應(yīng)用于commitAsyn()。

10

public void seek(TopicPartition partition,long offset)

獲取消費(fèi)者將在下一個(gè)poll()方法中使用的當(dāng)前偏移值。

11

public void resume()

恢復(fù)暫停的分區(qū)。

12

public void wakeup()

喚醒消費(fèi)者。

ConsumerRecord API

ConsumerRecord API用于從Kafka集群接收記錄。 此API由主題名稱(chēng),分區(qū)號(hào)(從中接收記錄)和指向Kafka分區(qū)中的記錄的偏移量組成。 ConsumerRecord類(lèi)用于創(chuàng)建具有特定主題名稱(chēng),分區(qū)計(jì)數(shù)和< key,value>的消費(fèi)者記錄。 對(duì)。 它有以下簽名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主題 - 從Kafka集群接收的使用者記錄的主題名稱(chēng)。

  • 分區(qū) - 主題的分區(qū)。

  • - 記錄的鍵,如果沒(méi)有鍵存在null將被返回。

  • - 記錄內(nèi)容。

ConsumerRecords API

ConsumerRecords API充當(dāng)ConsumerRecord的容器。 此API用于保存特定主題的每個(gè)分區(qū)的ConsumerRecord列表。 它的構(gòu)造器定義如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - 返回特定主題的分區(qū)地圖。

  • 記錄 - ConsumerRecord的返回列表。

ConsumerRecords類(lèi)定義了以下方法。

S.No方法和描述
1

public int count()

所有主題的記錄數(shù)。

2

public Set partitions()

在此記錄集中具有數(shù)據(jù)的分區(qū)集(如果沒(méi)有返回?cái)?shù)據(jù),則該集為空)。

3

public Iterator iterator()

迭代器使您可以循環(huán)訪問(wèn)集合,獲取或重新移動(dòng)元素。

4

public List records()

獲取給定分區(qū)的記錄列表。

配置設(shè)置

Consumer客戶(hù)端API主配置設(shè)置的配置設(shè)置如下所示 -

S.No設(shè)置和說(shuō)明
1

引導(dǎo)代理列表。

2

group.id

將單個(gè)消費(fèi)者分配給組。

3

enable.auto.commit

如果值為true,則為偏移啟用自動(dòng)落實(shí),否則不提交。

4

auto.commit.interval.ms

返回更新的消耗偏移量寫(xiě)入ZooKeeper的頻率。

5

session.timeout.ms

表示Kafka在放棄和繼續(xù)消費(fèi)消息之前等待ZooKeeper響應(yīng)請(qǐng)求(讀取或?qū)懭?多少毫秒。

SimpleConsumer應(yīng)用程序

生產(chǎn)者應(yīng)用程序步驟在此保持不變。 首先,啟動(dòng)你的ZooKeeper和Kafka代理。 然后使用名為 SimpleCon-sumer.java 的Java類(lèi)創(chuàng)建一個(gè) SimpleConsumer 應(yīng)用程序,并鍵入以下代碼。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>

輸入 - 打開(kāi)生成器CLI并向主題發(fā)送一些消息。 你可以把smple輸入為\'Hello Consumer\'。

輸出 - 以下是輸出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)