讓我們使用Java客戶(hù)端創(chuàng)建一個(gè)用于發(fā)布和使用消息的應(yīng)用程序。 Kafka生產(chǎn)者客戶(hù)端包括以下API。
讓我們了解本節(jié)中最重要的一組Kafka生產(chǎn)者API。 KafkaProducer API的中心部分是 KafkaProducer
類(lèi)。 KafkaProducer類(lèi)提供了一個(gè)選項(xiàng),用于將其構(gòu)造函數(shù)中的Kafka代理連接到以下方法。
KafkaProducer類(lèi)提供send方法以異步方式將消息發(fā)送到主題。 send()的簽名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - 生產(chǎn)者管理等待發(fā)送的記錄的緩沖區(qū)。
回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時(shí)執(zhí)行的用戶(hù)提供的回調(diào)(null表示無(wú)回調(diào))。
KafkaProducer類(lèi)提供了一個(gè)flush方法,以確保所有先前發(fā)送的消息都已實(shí)際完成。 flush方法的語(yǔ)法如下 -
public void flush()
KafkaProducer類(lèi)提供了partitionFor方法,這有助于獲取給定主題的分區(qū)元數(shù)據(jù)。 這可以用于自定義分區(qū)。 這種方法的簽名如下 -
public Map metrics()
它返回由生產(chǎn)者維護(hù)的內(nèi)部度量的映射。
public void close() - KafkaProducer類(lèi)提供關(guān)閉方法塊,直到所有先前發(fā)送的請(qǐng)求完成。
生產(chǎn)者API的中心部分是生產(chǎn)者
類(lèi)。 生產(chǎn)者類(lèi)提供了一個(gè)選項(xiàng),通過(guò)以下方法在其構(gòu)造函數(shù)中連接Kafka代理。
生產(chǎn)者類(lèi)提供send方法以使用以下簽名向單個(gè)或多個(gè)主題發(fā)送消息。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop);
有兩種類(lèi)型的生產(chǎn)者 - 同步和異步。
相同的API配置也適用于同步
生產(chǎn)者。 它們之間的區(qū)別是同步生成器直接發(fā)送消息,但在后臺(tái)發(fā)送消息。 當(dāng)您想要更高的吞吐量時(shí),異步生產(chǎn)者是首選。 在以前的版本,如0.8,一個(gè)異步生產(chǎn)者沒(méi)有回調(diào)send()注冊(cè)錯(cuò)誤處理程序。 這僅在當(dāng)前版本0.9中可用。
生產(chǎn)者類(lèi)提供關(guān)閉方法以關(guān)閉與所有Kafka代理的生產(chǎn)者池連接。
下表列出了Producer API的主要配置設(shè)置,以便更好地理解 -
S.No | 配置設(shè)置和說(shuō)明 |
---|---|
1 | client.id 標(biāo)識(shí)生產(chǎn)者應(yīng)用程序 |
2 | producer.type 同步或異步 |
3 | acks acks配置控制生產(chǎn)者請(qǐng)求下的標(biāo)準(zhǔn)是完全的。 |
4 | 重試 如果生產(chǎn)者請(qǐng)求失敗,則使用特定值自動(dòng)重試。 |
5 | bootstrapping代理列表。 |
6 | linger.ms 如果你想減少請(qǐng)求的數(shù)量,你可以將linger.ms設(shè)置為大于某個(gè)值的東西。 |
7 | key.serializer 序列化器接口的鍵。 |
8 | value.serializer 值。 |
9 | batch.size 緩沖區(qū)大小。 |
10 | buffer.memory 控制生產(chǎn)者可用于緩沖的存儲(chǔ)器的總量。 |
ProducerRecord是發(fā)送到Kafka cluster.ProducerRecord類(lèi)構(gòu)造函數(shù)的鍵/值對(duì),用于使用以下簽名創(chuàng)建具有分區(qū),鍵和值對(duì)的記錄。
public ProducerRecord (string topic, int partition, k key, v value)
主題 - 將附加到記錄的用戶(hù)定義的主題名稱(chēng)。
分區(qū) - 分區(qū)計(jì)數(shù)。
鍵 - 將包含在記錄中的鍵。
public ProducerRecord (string topic, k key, v value)
ProducerRecord類(lèi)構(gòu)造函數(shù)用于創(chuàng)建帶有鍵,值對(duì)和無(wú)分區(qū)的記錄。
主題 - 創(chuàng)建主題以分配記錄。
鍵 - 記錄的鍵。
值 - 記錄內(nèi)容。
public ProducerRecord (string topic, v value)
ProducerRecord類(lèi)創(chuàng)建一個(gè)沒(méi)有分區(qū)和鍵的記錄。
主題 - 創(chuàng)建主題。
值 - 記錄內(nèi)容。
ProducerRecord類(lèi)方法列在下表中 -
S.No | 類(lèi)方法和描述 |
---|---|
1 | public string topic() 主題將附加到記錄。 |
2 | public K key() 將包括在記錄中的鍵。 如果沒(méi)有這樣的鍵,null將在這里重新打開(kāi)。 |
3 | public V value() 記錄內(nèi)容。 |
4 | partition() 記錄的分區(qū)計(jì)數(shù) |
在創(chuàng)建應(yīng)用程序之前,首先啟動(dòng)ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。 之后,創(chuàng)建一個(gè)名為 Sim-pleProducer.java
的java類(lèi),然后鍵入以下代碼。
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “l(fā)ocalhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully"); producer.close(); } }
編譯 - 可以使用以下命令編譯應(yīng)用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>
輸出
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
到目前為止,我們已經(jīng)創(chuàng)建了一個(gè)發(fā)送消息到Kafka集群的生產(chǎn)者。 現(xiàn)在讓我們創(chuàng)建一個(gè)消費(fèi)者來(lái)消費(fèi)Kafka集群的消息。 KafkaConsumer API用于消費(fèi)來(lái)自Kafka集群的消息。 KafkaConsumer類(lèi)的構(gòu)造函數(shù)定義如下。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - 返回消費(fèi)者配置的地圖。
KafkaConsumer類(lèi)具有下表中列出的以下重要方法。
S.No | 方法和說(shuō)明 |
---|---|
1 | public java.util.Set< TopicPar- tition> assignment() 獲取由用戶(hù)當(dāng)前分配的分區(qū)集。 |
2 | public string subscription() 訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。 |
3 | public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener) 訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。 |
4 | public void unsubscribe() 從給定的分區(qū)列表中取消訂閱主題。 |
5 | public void sub-scribe(java.util.List< java.lang.String> topics) 訂閱給定的主題列表以獲取動(dòng)態(tài)簽名的分區(qū)。 如果給定的主題列表為空,則將其視為與unsubscribe()相同。 |
6 | public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener) 參數(shù)模式以正則表達(dá)式的格式引用預(yù)訂模式,而偵聽(tīng)器參數(shù)從預(yù)訂模式獲取通知。 |
7 | public void as-sign(java.util.List< TopicPartion> partitions) 向客戶(hù)手動(dòng)分配分區(qū)列表。 |
8 | poll() 使用預(yù)訂/分配API之一獲取指定的主題或分區(qū)的數(shù)據(jù)。 如果在輪詢(xún)數(shù)據(jù)之前未預(yù)訂主題,這將返回錯(cuò)誤。 |
9 | public void commitSync() 提交對(duì)主題和分區(qū)的所有子編制列表的最后一次poll()返回的提交偏移量。 相同的操作應(yīng)用于commitAsyn()。 |
10 | public void seek(TopicPartition partition,long offset) 獲取消費(fèi)者將在下一個(gè)poll()方法中使用的當(dāng)前偏移值。 |
11 | public void resume() 恢復(fù)暫停的分區(qū)。 |
12 | public void wakeup() 喚醒消費(fèi)者。 |
ConsumerRecord API用于從Kafka集群接收記錄。 此API由主題名稱(chēng),分區(qū)號(hào)(從中接收記錄)和指向Kafka分區(qū)中的記錄的偏移量組成。 ConsumerRecord類(lèi)用于創(chuàng)建具有特定主題名稱(chēng),分區(qū)計(jì)數(shù)和< key,value>的消費(fèi)者記錄。 對(duì)。 它有以下簽名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
主題 - 從Kafka集群接收的使用者記錄的主題名稱(chēng)。
分區(qū) - 主題的分區(qū)。
鍵 - 記錄的鍵,如果沒(méi)有鍵存在null將被返回。
值 - 記錄內(nèi)容。
ConsumerRecords API充當(dāng)ConsumerRecord的容器。 此API用于保存特定主題的每個(gè)分區(qū)的ConsumerRecord列表。 它的構(gòu)造器定義如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition - 返回特定主題的分區(qū)地圖。
記錄 - ConsumerRecord的返回列表。
ConsumerRecords類(lèi)定義了以下方法。
S.No | 方法和描述 |
---|---|
1 | public int count() 所有主題的記錄數(shù)。 |
2 | public Set partitions() 在此記錄集中具有數(shù)據(jù)的分區(qū)集(如果沒(méi)有返回?cái)?shù)據(jù),則該集為空)。 |
3 | public Iterator iterator() 迭代器使您可以循環(huán)訪問(wèn)集合,獲取或重新移動(dòng)元素。 |
4 | public List records() 獲取給定分區(qū)的記錄列表。 |
Consumer客戶(hù)端API主配置設(shè)置的配置設(shè)置如下所示 -
S.No | 設(shè)置和說(shuō)明 |
---|---|
1 | 引導(dǎo)代理列表。 |
2 | group.id 將單個(gè)消費(fèi)者分配給組。 |
3 | enable.auto.commit 如果值為true,則為偏移啟用自動(dòng)落實(shí),否則不提交。 |
4 | auto.commit.interval.ms 返回更新的消耗偏移量寫(xiě)入ZooKeeper的頻率。 |
5 | session.timeout.ms 表示Kafka在放棄和繼續(xù)消費(fèi)消息之前等待ZooKeeper響應(yīng)請(qǐng)求(讀取或?qū)懭?多少毫秒。 |
生產(chǎn)者應(yīng)用程序步驟在此保持不變。 首先,啟動(dòng)你的ZooKeeper和Kafka代理。 然后使用名為 SimpleCon-sumer.java
的Java類(lèi)創(chuàng)建一個(gè) SimpleConsumer
應(yīng)用程序,并鍵入以下代碼。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
編譯 - 可以使用以下命令編譯應(yīng)用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>
輸入 - 打開(kāi)生成器CLI并向主題發(fā)送一些消息。 你可以把smple輸入為\'Hello Consumer\'。
輸出 - 以下是輸出。
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer
更多建議: