RocketMQ 應(yīng)用非常多,但是在實(shí)際代碼開發(fā)過程,我們肯定不能以上面的代碼在實(shí)際項(xiàng)目中應(yīng)用, 肯定是要把它們都封裝一下,由自己提供的Api來調(diào)用RocketMQ,這樣才能更方便!
對于消息隊(duì)列,我們關(guān)注的地方:
package com.pangugle.framework.mq;
import com.pangugle.framework.service.Callback;
public interface MQSupport{
/**
* 對于rocketmq 沒有用
* @param topic
*/
public void declareTopic(String topic);
public void deleteTopic(String topic);
/**
* 消息消息
* @param topic
* @param body
* @return
*/
public boolean sendMessage(String topic, String body);
public boolean sendMessage(String topic, String body, String tags);
/**
* 消費(fèi)消息, 消息不重復(fù)消息
* @param tags
* @param callback
*/
public void consume(String topic, String tags, Callback<String> callback);
/**
* 訂閱消息,消息重復(fù)消費(fèi)
* @param tags
* @param callback
*/
public void subscribe(String topic, String tags, Callback<String> callback);
}
上面我們定義了一個(gè)發(fā)送消息的方法:
1. sendMessage(String topic, String body);
2. sendMessage(String topic, String body, String tags);
和消費(fèi)消息的方法:
1. consume(String topic, String tags, Callback<String> callback);
2. subscribe(String topic, String tags, Callback<String> callback);
package com.pangugle.framework.mq.impl;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import com.pangugle.framework.conf.MyConfiguration;
import com.pangugle.framework.log.Log;
import com.pangugle.framework.log.LogFactory;
import com.pangugle.framework.mq.MQSupport;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.FastJsonHelper;
import com.pangugle.framework.utils.StringUtils;
public class RocketMQImpl implements MQSupport {
private static Log LOG = LogFactory.getLog(RocketMQImpl.class);
private static String DEFAULT_GROUP = "pangule_default_group";
private static int DEFAULT_CONSMER_THREAD_SIZE = 5;
private static String mServer = null;
private static ClientConfig mClientConfig = new ClientConfig();
private static DefaultMQProducer mProducer;
public RocketMQImpl() {
synchronized (RocketMQImpl.class) {
if (mProducer == null) {
initClientConfig();
initProducer();
}
}
}
@Override
public void declareTopic(String queue) {
}
@Override
public void deleteTopic(String queue) {
}
@Override
public boolean sendMessage(String queue, String body, String tags) {
try {
if(StringUtils.isEmpty(tags))
{
tags = StringUtils.getEmpty();
}
Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
// Call send message to deliver message to one of brokers.
SendResult sendResult = mProducer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
return true;
}
LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
} catch (Exception e) {
LOG.error("send queue error:", e);
}
return false;
}
@Override
public boolean sendMessage(String queue, String body) {
return sendMessage(queue, body, null);
}
private static void initClientConfig() {
mServer = MyConfiguration.getInstance().getString("mq.rocket.server");
LOG.info("rocketmq.server = " + mServer);
// 客戶端本機(jī) IP 地址,某些機(jī)器會(huì)發(fā)生無法識(shí)別客戶端IP地址情況,需要應(yīng)用在代碼中強(qiáng)制指定
// Name Server 地址列表,多個(gè) NameServer 地址用分號(hào) 隔開
mClientConfig.setNamesrvAddr(mServer);
// 客戶端實(shí)例名稱,客戶端創(chuàng)建的多個(gè) Producer、 Consumer 實(shí)際是共用一個(gè)內(nèi)部實(shí)例(這個(gè)實(shí)例包含
// 網(wǎng)絡(luò)連接、線程資源等),默認(rèn)值:DEFAULT
mClientConfig.setInstanceName("DEFAULT");
// 通信層異步回調(diào)線程數(shù),默認(rèn)值4
mClientConfig.setClientCallbackExecutorThreads(10);
// 輪詢 Name Server 間隔時(shí)間,單位毫秒,默認(rèn):30000
// mClientConfig.setPollNameServerInterval(30000);
// 向 Broker 發(fā)送心跳間隔時(shí)間,單位毫秒,默認(rèn):30000
mClientConfig.setHeartbeatBrokerInterval(30000);
// 持久化 Consumer 消費(fèi)進(jìn)度間隔時(shí)間,單位毫秒,默認(rèn):5000
mClientConfig.setPersistConsumerOffsetInterval(5000);
}
private static void initProducer() {
try {
mProducer = new DefaultMQProducer();
mProducer.resetClientConfig(mClientConfig);
// 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù) 默認(rèn)值 4
mProducer.setDefaultTopicQueueNums(4);
// 發(fā)送消息超時(shí)時(shí)間,單位毫秒 : 默認(rèn)值 10000
mProducer.setSendMsgTimeout(10000);
// 消息Body超過多大開始壓縮(Consumer收到消息會(huì)自動(dòng)解壓縮),單位字節(jié) 默認(rèn)值 4096
mProducer.setCompressMsgBodyOverHowmuch(4096);
// 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 默認(rèn)值 FALSE
mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);
mProducer.setProducerGroup(DEFAULT_GROUP);
// mProducer.setRetryTimesWhenSendAsyncFailed(3);
mProducer.start();
} catch (Exception e) {
LOG.error("init producer error:", e);
}
}
@Override
public void consume(String topic, String tags, Callback<String> callback) {
try {
if(StringUtils.isEmpty(tags))
{
tags = StringUtils.getEmpty();
}
DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// TODO Auto-generated method stub
for(MessageExt ext : msgs)
{
try {
String body = new String(ext.getBody());
callback.execute(body);
} catch (Exception e) {
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
LOG.error("consume error:", e);
}
}
@Override
public void subscribe(String topic, String tags, Callback<String> callback) {
try {
if(StringUtils.isEmpty(tags))
{
tags = StringUtils.getEmpty();
}
DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// TODO Auto-generated method stub
for(MessageExt ext : msgs)
{
try {
String body = new String(ext.getBody());
callback.execute(body);
} catch (Exception e) {
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
LOG.error("subscrebe error:", e);
}
}
private static DefaultMQPushConsumer getConsumerInstance(String topic, String tags) throws MQClientException
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.resetClientConfig(mClientConfig);
consumer.setConsumerGroup(topic + tags);
consumer.setConsumeThreadMin(DEFAULT_CONSMER_THREAD_SIZE);
consumer.setConsumeThreadMax(DEFAULT_CONSMER_THREAD_SIZE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// mConsumer.subscribe(queue, "TagA || TagC || TagD");
consumer.subscribe(topic, tags);
return consumer;
}
}
package com.pangugle.framework.mq;
import java.io.IOException;
import java.util.Map;
import com.google.common.collect.Maps;
import com.pangugle.framework.mq.impl.RedisMQImpl;
import com.pangugle.framework.mq.impl.RocketMQImpl;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.ThreadUtils;
public class MQManager{
Map<String, MQSupport> maps = Maps.newConcurrentMap();
private interface ManagerInternal {
public MQManager mgr = new MQManager();
}
public static MQManager getInstance()
{
return ManagerInternal.mgr;
}
private MQManager()
{
//maps.put(MQImpl.REDIS.name(), new RedisMQImpl());
maps.put(MQImpl.ROCKETMQ.name(), new RocketMQImpl());
}
public MQSupport getMQ()
{
return maps.get(MQImpl.ROCKETMQ.name());
}
public static enum MQImpl{
REDIS, // redis
ROCKETMQ; // rocketmq
}
public static void main(String[] args) throws InterruptedException, IOException
{
// 定義消息隊(duì)列
String queue = "pangugle_test";
//
String tags = null;
MQSupport mq = MQManager.getInstance().getMQ();
// 訂閱消息消費(fèi)
mq.subscribe(queue, tags, new Callback<String>() {
public void execute(String o) {
System.out.println("consuemr 1 " + o);
}
});
// 集群消息消息
// mq.consume(queue, tags, new Callback<String>() {
// public void execute(String o) {
// System.out.println("consuemr 1 " + o);
// }
// });
for(int i = 0; i < 1000; i ++)
{
mq.sendMessage(queue, "i = " + i, tags);
ThreadUtils.sleep(1000);
}
System.in.read();
}
}
注意上面測試:
要到控制臺(tái)創(chuàng)建 Topic 隊(duì)列名稱,也就是 pangugle_test 這個(gè)名稱!
好了搞定了
現(xiàn)在我們使用消息隊(duì)列就非常簡單了:
更多建議: