1、在配置文件里面设置好ROCKETMQ参数
[ROCKETMQ]
#是否开启服务功能
rq_enable=1
#ROCKETMQ服务地址,可集群,多个用分号隔开
rq_nameserver=localhost:9876
#客户端ID
rq_groupid=jkas1122
#验证用户名
rq_user=
#验证密码
rq_pass=
#重试底数
rq_retrytimes=4
#开启重试
rq_retryanother=1
#默认订阅主题
rq_subtopics=test2,test3
#收到消息回调类,必须继承于JkasRQBack
rq_callbackclass=test.z.rocketmq.Test
2、常用方法
protected String rqSendMsg(String topic,String msg)//发送消息到指定主题
protected String rqSendMsg(String topic,String tag,String msg)//发送消息到指定主题,设定tag
protected String rqSendMsg(String topic,String tag,String key,String msg)//发送消息到指定主题,设定tag和key
protected boolean rqSubTopic(String topic)//订阅主题
protected boolean rqSubTopic(String topic,String subExpression)//订阅主题,并指定subExpression
3、关于收到消息回调类
该类必须继承于JkasMQBack,并覆盖相应方法,如下示例:
package test.z.rocketmq;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.jkas.core.JkasRQBack;
public
class Test extends JkasRQBack
{
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context)
{//收到消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}