RocketMQ Java 客戶端封裝

2023-07-12 17:23 更新

RocketMQ 應(yīng)用非常多,但是在實(shí)際代碼開發(fā)過程,我們肯定不能以上面的代碼在實(shí)際項(xiàng)目中應(yīng)用, 肯定是要把它們都封裝一下,由自己提供的Api來調(diào)用RocketMQ,這樣才能更方便!

對于消息隊(duì)列,我們關(guān)注的地方:

  • 消息生產(chǎn)者對于消息生產(chǎn)者,我們只關(guān)注兩點(diǎn) 1. 隊(duì)列名稱 2. 要發(fā)送的消息
  • 消息消費(fèi)者對于消費(fèi)者,我們只關(guān)注以下幾點(diǎn) 1. 訂閱消息 2. 集群消費(fèi)消息

站在應(yīng)用層上,調(diào)用方只關(guān)注Api調(diào)用,它不關(guān)注Rocketmq內(nèi)部的具體實(shí)現(xiàn),和初始化!

  • 定義一個(gè)接口:
package com.pangugle.framework.mq;
import com.pangugle.framework.service.Callback;

public  interface MQSupport{

	/**
	 * 對于rocketmq 沒有用
	 * @param topic
	 */
	public  void declareTopic(String topic);
	public  void deleteTopic(String topic);

	/**
	 * 消息消息
	 * @param topic
	 * @param body
	 * @return
	 */
	public  boolean sendMessage(String topic, String body);
	public  boolean sendMessage(String topic, String body, String tags);

	/**
	 * 消費(fèi)消息, 消息不重復(fù)消息
	 * @param tags
	 * @param callback
	 */
	public void consume(String topic, String tags, Callback<String> callback);

	/**
	 * 訂閱消息,消息重復(fù)消費(fèi)
	 * @param tags
	 * @param callback
	 */
	public void subscribe(String topic, String tags, Callback<String> callback);
}

上面我們定義了一個(gè)發(fā)送消息的方法:

1. sendMessage(String topic, String body);

2. sendMessage(String topic, String body, String tags);

和消費(fèi)消息的方法:

1. consume(String topic, String tags, Callback<String> callback);

2. subscribe(String topic, String tags, Callback<String> callback);
  • Rocketmq 實(shí)現(xiàn)這個(gè)接口
package com.pangugle.framework.mq.impl;

import java.util.List;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import com.pangugle.framework.conf.MyConfiguration;
import com.pangugle.framework.log.Log;
import com.pangugle.framework.log.LogFactory;
import com.pangugle.framework.mq.MQSupport;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.FastJsonHelper;
import com.pangugle.framework.utils.StringUtils;

public class RocketMQImpl implements MQSupport {

	private static Log LOG = LogFactory.getLog(RocketMQImpl.class);

	private static String DEFAULT_GROUP = "pangule_default_group";

	private static int DEFAULT_CONSMER_THREAD_SIZE = 5;

	private static String mServer = null;

	private static ClientConfig mClientConfig = new ClientConfig();
	private static DefaultMQProducer mProducer;

	public RocketMQImpl() {
		synchronized (RocketMQImpl.class) {
			if (mProducer == null) {
				initClientConfig();
				initProducer();
			}
		}
	}

	@Override
	public void declareTopic(String queue) {
	}

	@Override
	public void deleteTopic(String queue) {
	}

	@Override
	public boolean sendMessage(String queue, String body, String tags) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));

			// Call send message to deliver message to one of brokers.
			SendResult sendResult = mProducer.send(msg);
			if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
				return true;
			}
			LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
		} catch (Exception e) {
			LOG.error("send queue error:", e);
		}
		return false;
	}

	@Override
	public boolean sendMessage(String queue, String body) {
		return sendMessage(queue, body, null);
	}

	private static void initClientConfig() {
		mServer = MyConfiguration.getInstance().getString("mq.rocket.server");
		LOG.info("rocketmq.server = " + mServer);

		// 客戶端本機(jī) IP 地址,某些機(jī)器會(huì)發(fā)生無法識(shí)別客戶端IP地址情況,需要應(yīng)用在代碼中強(qiáng)制指定
		// Name Server 地址列表,多個(gè) NameServer 地址用分號(hào) 隔開
		mClientConfig.setNamesrvAddr(mServer);
		// 客戶端實(shí)例名稱,客戶端創(chuàng)建的多個(gè) Producer、 Consumer 實(shí)際是共用一個(gè)內(nèi)部實(shí)例(這個(gè)實(shí)例包含
		// 網(wǎng)絡(luò)連接、線程資源等),默認(rèn)值:DEFAULT
		mClientConfig.setInstanceName("DEFAULT");
		// 通信層異步回調(diào)線程數(shù),默認(rèn)值4
		mClientConfig.setClientCallbackExecutorThreads(10);
		// 輪詢 Name Server 間隔時(shí)間,單位毫秒,默認(rèn):30000
		// mClientConfig.setPollNameServerInterval(30000);
		// 向 Broker 發(fā)送心跳間隔時(shí)間,單位毫秒,默認(rèn):30000
		mClientConfig.setHeartbeatBrokerInterval(30000);
		// 持久化 Consumer 消費(fèi)進(jìn)度間隔時(shí)間,單位毫秒,默認(rèn):5000
		mClientConfig.setPersistConsumerOffsetInterval(5000);
	}

	private static void initProducer() {
		try {
			mProducer = new DefaultMQProducer();

			mProducer.resetClientConfig(mClientConfig);
			// 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù) 默認(rèn)值 4
			mProducer.setDefaultTopicQueueNums(4);
			// 發(fā)送消息超時(shí)時(shí)間,單位毫秒 : 默認(rèn)值 10000
			mProducer.setSendMsgTimeout(10000);
			// 消息Body超過多大開始壓縮(Consumer收到消息會(huì)自動(dòng)解壓縮),單位字節(jié) 默認(rèn)值 4096
			mProducer.setCompressMsgBodyOverHowmuch(4096);
			// 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 默認(rèn)值 FALSE
			mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);

			mProducer.setProducerGroup(DEFAULT_GROUP);
//			mProducer.setRetryTimesWhenSendAsyncFailed(3);
			mProducer.start();
		} catch (Exception e) {
			LOG.error("init producer error:", e);
		}
	}

	@Override
	public void consume(String topic, String tags, Callback<String> callback) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
			consumer.setMessageModel(MessageModel.CLUSTERING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
					// TODO Auto-generated method stub
					for(MessageExt ext : msgs)
			    	{
			    		try {
			    			String body =  new String(ext.getBody());
			    			callback.execute(body);
						} catch (Exception e) {
						}
			    	}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
		} catch (MQClientException e) {
			LOG.error("consume error:", e);
		}
	}

	@Override
	public void subscribe(String topic, String tags, Callback<String> callback) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
			consumer.setMessageModel(MessageModel.BROADCASTING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
					// TODO Auto-generated method stub
					for(MessageExt ext : msgs)
			    	{
			    		try {
			    			String body =  new String(ext.getBody());
			    			callback.execute(body);
						} catch (Exception e) {
						}
			    	}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
		} catch (MQClientException e) {
			LOG.error("subscrebe error:", e);
		}
	}

	private static DefaultMQPushConsumer getConsumerInstance(String topic, String tags) throws MQClientException
	{
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
		consumer.resetClientConfig(mClientConfig);
		consumer.setConsumerGroup(topic + tags);
		consumer.setConsumeThreadMin(DEFAULT_CONSMER_THREAD_SIZE);
		consumer.setConsumeThreadMax(DEFAULT_CONSMER_THREAD_SIZE);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		// mConsumer.subscribe(queue, "TagA || TagC || TagD");
		consumer.subscribe(topic, tags);
		return consumer;
	}

}
  • 簡化調(diào)用:
package com.pangugle.framework.mq;

import java.io.IOException;
import java.util.Map;

import com.google.common.collect.Maps;
import com.pangugle.framework.mq.impl.RedisMQImpl;
import com.pangugle.framework.mq.impl.RocketMQImpl;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.ThreadUtils;

public class MQManager{

	Map<String, MQSupport> maps = Maps.newConcurrentMap();

	private interface ManagerInternal {
		public MQManager mgr = new MQManager();
	}

	public static MQManager getInstance()
	{
		return ManagerInternal.mgr;
	}

	private MQManager()
	{
		//maps.put(MQImpl.REDIS.name(), new RedisMQImpl());
		maps.put(MQImpl.ROCKETMQ.name(), new RocketMQImpl());
	}

	public MQSupport getMQ()
	{
		return maps.get(MQImpl.ROCKETMQ.name());
	}

	public static enum MQImpl{
		REDIS, // redis
		ROCKETMQ; // rocketmq
	}

	public static void main(String[] args) throws InterruptedException, IOException
	{
    // 定義消息隊(duì)列
		String queue = "pangugle_test";

    //
		String tags = null;

		MQSupport mq = MQManager.getInstance().getMQ();

    // 訂閱消息消費(fèi)
		mq.subscribe(queue, tags, new Callback<String>() {
			public void execute(String o) {
				System.out.println("consuemr 1 " + o);
			}
		});

    // 集群消息消息
//		mq.consume(queue, tags, new Callback<String>() {
//			public void execute(String o) {
//				System.out.println("consuemr 1 " + o);
//			}
//		});

		for(int i = 0; i < 1000; i ++)
		{
			mq.sendMessage(queue, "i = " + i, tags);
			ThreadUtils.sleep(1000);
		}

		System.in.read();
	}
}

注意上面測試:

要到控制臺(tái)創(chuàng)建 Topic 隊(duì)列名稱,也就是 pangugle_test 這個(gè)名稱!

好了搞定了

現(xiàn)在我們使用消息隊(duì)列就非常簡單了:

  • 初始化消息隊(duì)列 MQSupport mq = MQManager.getInstance().getMQ();
  • 發(fā)送消息 sendMessage
  • 消費(fèi)消息subscribeconsume


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)