ROCKETMQ

1、在配置文件里面设置好ROCKETMQ参数

[ROCKETMQ]

#是否开启服务功能
rq_enable=1

#ROCKETMQ服务地址,可集群,多个用分号隔开
rq_nameserver=localhost:9876

#客户端ID
rq_groupid=jkas1122

#验证用户名
rq_user=

#验证密码
rq_pass=

#重试底数
rq_retrytimes=4

#开启重试
rq_retryanother=1

#默认订阅主题
rq_subtopics=test2,test3
#收到消息回调类,必须继承于JkasRQBack

rq_callbackclass=test.z.rocketmq.Test

 

2、常用方法

protected String rqSendMsg(String topic,String msg)//发送消息到指定主题

protected String rqSendMsg(String topic,String tag,String msg)//发送消息到指定主题,设定tag

protected String rqSendMsg(String topic,String tag,String key,String msg)//发送消息到指定主题,设定tag和key

protected boolean rqSubTopic(String topic)//订阅主题

protected boolean rqSubTopic(String topic,String subExpression)//订阅主题,并指定subExpression

 

    3、关于收到消息回调类

    该类必须继承于JkasMQBack,并覆盖相应方法,如下示例:

  package test.z.rocketmq;
  import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  import org.apache.rocketmq.common.message.MessageExt;

  import org.jkas.core.JkasRQBack;
   public class Test extends JkasRQBack
  {
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//收到消息
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }


  }