maven 工程添加庫
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加庫
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客戶端版本要和服務端版本的一致,或者會發(fā)生一些奇怪的問題:
我遇到過版本不一致會發(fā)生,消息無法確認消息消費,也就是說 客戶端已經(jīng)消費了,也提交成功了,但是服務端沒有同步到!
注意:
tag的使用!
要到控制臺創(chuàng)建 Topic 隊列名稱
官方過濾消息例子:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
發(fā)送消息
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消費消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 消息處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
更多建議: