经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息
来源:cnblogs  作者:随机的未知  时间:2024/2/5 10:13:11  对本文有异议

RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

发送同步消息

同步消息

生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

前文demo程序就是发送的同步消息。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

代码如下:

  1. /**
  2. * 异步消息测试
  3. */
  4. @Test
  5. public void simpleAsyncProducer() throws Exception {
  6. //创建一个生产者,并指定一个组名
  7. DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
  8. //连接namesrv,参数是namesrv的ip地址:端口号
  9. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  10. //启动
  11. producer.start();
  12. //指定topic,创建一个消息
  13. Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());
  14. //发送异步消息,并设置回调内容
  15. producer.send(message, new SendCallback() {
  16. @Override
  17. public void onSuccess(SendResult sendResult) {
  18. log.info("回调内容,发送成功");
  19. }
  20. @Override
  21. public void onException(Throwable throwable) {
  22. log.info("回调内容,发送失败");
  23. }
  24. });
  25. log.info("主线程执行中=========");
  26. System.in.read();
  27. }

运行结果

从运行结果可以看到是不同的线程输出的内容。

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

代码如下:

  1. @Test
  2. public void oneWayMessageTest() throws Exception {
  3. //创建一个生产者,并指定一个组名
  4. DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
  5. //连接namesrv,参数是namesrv的ip地址:端口号
  6. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  7. //启动
  8. producer.start();
  9. //指定topic,创建一个消息
  10. Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());
  11. //发送单向消息
  12. producer.sendOneway(message);
  13. producer.shutdown();
  14. }

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费.

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

代码如下

  1. @Test
  2. public void msMessageTest() throws Exception{
  3. //创建一个生产者,并指定一个组名
  4. DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
  5. //连接namesrv,参数是namesrv的ip地址:端口号
  6. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  7. //启动
  8. producer.start();
  9. //指定topic,创建一个消息
  10. Message message = new Message("msTopic1", "这是一条单向消息".getBytes());
  11. //给消息设置一个延迟时间
  12. message.setDelayTimeLevel(3);
  13. //发送延时消息
  14. producer.sendOneway(message);
  15. producer.shutdown();
  16. }

延时等级如下:

消息延时等级

发送批量消息

代码如下:

  1. @Test
  2. public void testBatchProducer() throws Exception {
  3. // 创建默认的生产者
  4. DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");
  5. //连接namesrv,参数是namesrv的ip地址:端口号
  6. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  7. // 启动实例
  8. producer.start();
  9. List<Message> msgs = Arrays.asList(
  10. new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),
  11. new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),
  12. new Message("batchTopicTest", "我是一组消息的C消息".getBytes())
  13. );
  14. SendResult send = producer.send(msgs);
  15. System.out.println(send);
  16. // 关闭实例
  17. producer.shutdown();
  18. }

这些消息会被放到同一个队列中。

发送顺序消息

可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

前文我们讲的都是并发消息,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

  • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
  • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

这就要引出顺序消息:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

代码

消息封装成实体类如下:

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class MessageModel {
  5. //订单id
  6. private String orderId;
  7. //用户id
  8. private String userId;
  9. //消息描述
  10. private String description;
  11. }

发送顺序消息的生产者代码如下:

  1. /**
  2. * 顺序消息
  3. */
  4. @Test
  5. public void testOrderlyProducer() throws Exception {
  6. List<MessageModel> messageModelList = Arrays.asList(
  7. //用户1的订单
  8. new MessageModel("order-111","user-1","下单"),
  9. new MessageModel("order-111","user-1","发短信"),
  10. new MessageModel("order-111","user-1","发货"),
  11. //用户2的订单
  12. new MessageModel("order-222","user-2","下单"),
  13. new MessageModel("order-222","user-2","发短信"),
  14. new MessageModel("order-222","user-2","发货")
  15. );
  16. // 创建默认的生产者
  17. DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");
  18. //连接namesrv,参数是namesrv的ip地址:端口号
  19. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  20. // 启动实例
  21. producer.start();
  22. //发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里
  23. messageModelList.forEach(
  24. messageModel->{
  25. Message message = new Message("orderlyTopic", messageModel.toString().getBytes());
  26. try {
  27. //发送消息,相同订单号去相同队列
  28. producer.send(message, new MessageQueueSelector() {
  29. @Override
  30. public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
  31. //producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg
  32. //在这里选择队列
  33. int hashCode = Math.abs(arg.toString().hashCode());
  34. int index = hashCode % mqs.size();
  35. return mqs.get(index);
  36. }
  37. }, messageModel.getOrderId());
  38. } catch (Exception e) {
  39. log.error("有错误发生",e);
  40. }
  41. }
  42. );
  43. // 关闭实例
  44. producer.shutdown();
  45. log.info("发送完成");
  46. }

消费顺序消息的消费者代码如下:

  1. //消费者
  2. @Test
  3. public void orderlyConsumer() throws Exception {
  4. //创建一个消费者,并指定一个组名
  5. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");
  6. //连接namesrv,参数是namesrv的ip地址:端口号
  7. consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  8. //订阅一个主题 *号表示订阅这个主题中所有的消息
  9. consumer.subscribe("orderlyTopic","*");
  10. //设置一个监听器(一直监听,异步回调方式)
  11. consumer.registerMessageListener(new MessageListenerOrderly() {
  12. @Override
  13. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  14. log.info("线程id"+Thread.currentThread().getId());
  15. log.info("消息内容:"+new String(msgs.get(0).getBody()));
  16. return ConsumeOrderlyStatus.SUCCESS;
  17. }
  18. });
  19. //启动消费者
  20. consumer.start();
  21. //挂起当前jvm,防止主线程结束,让监听器一直监听
  22. System.in.read();
  23. }

运行结果如下:

运行结果

可以看到同一个订单是顺序消费的。

其他问题

如果我们的消息消费失败了怎么办?

如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

发送带标签的消息,消息过滤

如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

生产者代码如下:

  1. @Test
  2. public void testTagProducer() throws Exception {
  3. // 创建默认的生产者
  4. DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");
  5. //连接namesrv,参数是namesrv的ip地址:端口号
  6. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  7. // 启动实例
  8. producer.start();
  9. Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());
  10. Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());
  11. producer.send(messageTopic1);
  12. producer.send(messageTopic2);
  13. // 关闭实例
  14. producer.shutdown();
  15. }

消费tag1的消费者

  1. //消费tag1的消费者
  2. @Test
  3. public void tagConsumer1() throws Exception {
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
  5. consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  6. consumer.subscribe("tagTopic", "tag1");
  7. consumer.registerMessageListener(new MessageListenerConcurrently() {
  8. @Override
  9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  10. System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }
  13. });
  14. consumer.start();
  15. System.in.read();
  16. }

消费tag1和tag2的消费者

  1. //消费tag1和tag2的消费者
  2. @Test
  3. public void tagConsumer2() throws Exception {
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
  5. consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  6. consumer.subscribe("tagTopic", "tag1 || tag2");
  7. consumer.registerMessageListener(new MessageListenerConcurrently() {
  8. @Override
  9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  10. System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }
  13. });
  14. consumer.start();
  15. System.in.read();
  16. }

带key的消息

消息都会有自己的MessageId的,如下图:

messageid

那我们能否指定id呢?

在发送消息时可以指定key:

  1. @Test
  2. public void testKeyProducer() throws Exception {
  3. // 创建默认的生产者
  4. DefaultMQProducer producer = new DefaultMQProducer("test-key-group");
  5. //连接namesrv,参数是namesrv的ip地址:端口号
  6. producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  7. String key = UUID.randomUUID().toString();
  8. // 启动实例
  9. producer.start();
  10. Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());
  11. producer.send(messageTopic1);
  12. // 关闭实例
  13. producer.shutdown();
  14. }

消费者获取key:

  1. @Test
  2. public void testKeyConsumer() throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
  4. consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
  5. consumer.subscribe("keyTopic","*");
  6. consumer.registerMessageListener(new MessageListenerConcurrently() {
  7. @Override
  8. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  9. System.out.println("我们设置的key:" + msgs.get(0).getKeys());
  10. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  11. }
  12. });
  13. consumer.start();
  14. System.in.read();
  15. }

输出如下:

输出

原文链接:https://www.cnblogs.com/nicaicai/p/18005528

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号