经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
RocketMQ的简单使用
来源:cnblogs  作者:Leo哥coding~  时间:2023/5/4 9:20:13  对本文有异议

大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。

领域模型介绍

Producer: 用于生产消息的运行实体。

Topic: 主题,用于消息传输和存储的分组容器。

MessageQueue: 消息传输和存储的实际单元容器。

Message: 消息传输的最小单元。

ConsumerGroup: 消费者组。

Consumer: 消费者。

Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置。

MQ的优势

MQ的明显优势有3个。

应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合。

异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度。

削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求。

当然MQ也有劣势系统可用性降低系统复杂度提高一致性问题

RocketMQ的主要角色

主要包括Producer、Broker、Consumer、NameServer Cluster。

一对多

可以通过设置不同的消费者组

不同组通过不同的消费者组既可以实现同时收到一样数量的消息,那同一个消费者组需要怎样才能收到同样数量的消息呢?

  1. // 消费者消费模式
  2. consumer.setMessageModel(MessageModel.BROADCASTING);

默认是集群模式CLUSTERING,设置成广播模式

既可以实现一对多的发送。

同步消息(普通消息)

同步消息需要阻塞等待消息发送结果的返回

  1. public class ProducerDemo {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. Message message = new Message();
  7. message.setTopic("MQLearn");
  8. message.setTags("1.0.0");
  9. message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
  10. SendResult result = producer.send(message);
  11. if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
  12. System.out.println(result);
  13. System.out.println("发送成功:" + message);
  14. }
  15. producer.shutdown();
  16. }
  17. }

异步消息

异步消息需要实现发送成功和失败的回调函数。

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr("192.168.246.140:9876");
  5. producer.start();
  6. // 异步消息
  7. for (int i = 0; i < 10; i++) {
  8. Message message = new Message();
  9. message.setTopic("topic7");
  10. message.setTags("1.0.0");
  11. message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
  12. producer.send(message, new SendCallback() {
  13. @Override
  14. public void onSuccess(SendResult sendResult) {
  15. // 发送成功的回调方法
  16. System.out.println(sendResult);
  17. }
  18. @Override
  19. public void onException(Throwable e) {
  20. // 发送失败的回调方法
  21. System.out.println(e);
  22. }
  23. });
  24. }
  25. TimeUnit.SECONDS.sleep(10);
  26. System.out.println("异步发送完成!");
  27. }
  28. }

单向消息

单向消息就类似UDP,只顾单向发送,不管是否发送成功,常用于日志收集等场景。

  1. public class SingleDirectionProducer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. // 单向消息
  7. for (int i = 0; i < 10; i++) {
  8. Message message = new Message();
  9. message.setTopic("topic8");
  10. message.setTags("1.0.0");
  11. message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
  12. producer.sendOneway(message);
  13. }
  14. System.out.println("带向发送完成!");
  15. }
  16. }

延时(定时)消息

RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别:

1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h

  1. // 设置消息延时级别
  2. message.setDelayTimeLevel(3);

批量消息

批量消息支持一次发送多条消息。

注意:

  • 批量消息需要有相同的topic

  • 不能是延时消息

  • 消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送)

  1. public class BatchProducer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. List<Message> list = new ArrayList<>();
  7. // 批量消息
  8. for (int i = 0; i < 10; i++) {
  9. Message message = new Message();
  10. message.setTopic("topic10");
  11. message.setTags("1.0.0");
  12. message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
  13. list.add(message);
  14. }
  15. SendResult result = producer.send(list);
  16. System.out.println(result);
  17. TimeUnit.SECONDS.sleep(2);
  18. System.out.println("发送完成!");
  19. }
  20. }

顺序消息

顺序消息支持按照消息的发送消息先后获取消息。

比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成。

通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题。

通过实现MessageQueueSelector来选择一个队列。

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. Message message = new Message();
  7. // 模拟业务ID
  8. int step = 10;
  9. message.setTopic("topic12");
  10. message.setTags("1.0.0");
  11. message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
  12. producer.send(message, new MessageQueueSelector() {
  13. @Override
  14. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  15. // 队列数
  16. int size = mqs.size();
  17. // 取模
  18. int orderId = step;
  19. return mqs.get(orderId % size);
  20. }
  21. }, null);
  22. System.out.println("发送完成!");
  23. }
  24. }
  1. public class Consumer {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
  4. consumer.setConsumerGroup("group1");
  5. consumer.setNamesrvAddr("localhost:9876");
  6. consumer.subscribe("topic12", "*");
  7. // 消费者,起一个顺序监听,一个线程,只监听一个队列
  8. consumer.registerMessageListener(new MessageListenerOrderly() {
  9. @Override
  10. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  11. for (MessageExt msg : msgs) {
  12. System.out.println(msg);
  13. byte[] body = msg.getBody();
  14. System.out.println(new String(body));
  15. }
  16. return ConsumeOrderlyStatus.SUCCESS;
  17. }
  18. });
  19. consumer.start();
  20. System.out.println("消费者启动了!");
  21. }
  22. }

事务消息

RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性。

大致流程为,

  1. 生产者先将消息发送至RocketMQ。

  2. RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态。

  3. 执行本地事务逻辑。

  4. 生产者根据事务执行结果向Broker提交commit或者rollback结果。

  5. 如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查。

  6. 生产者收到消息回查后,需要本地事务执行的最终结果。

  7. 生产者对本地事务状态进行二次提交或确认。

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. TransactionMQProducer producer = new TransactionMQProducer("group1");
  4. producer.setNamesrvAddr("localhost:9876");
  5. // 设置事务监听
  6. producer.setTransactionListener(new TransactionListener() {
  7. // 正常事务监听
  8. @Override
  9. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  10. // 把消息保存到mysql数据库
  11. boolean ok = false;
  12. if (ok) {
  13. System.out.println("正常执行事务过程");
  14. return LocalTransactionState.COMMIT_MESSAGE;
  15. } else {
  16. System.out.println("事务补偿过程");
  17. return LocalTransactionState.UNKNOW;
  18. //return LocalTransactionState.ROLLBACK_MESSAGE;
  19. }
  20. }
  21. // 事务补偿事务
  22. @Override
  23. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  24. System.out.println("事务补偿过程");
  25. // sql select
  26. if (true) {
  27. } else {
  28. }
  29. return LocalTransactionState.COMMIT_MESSAGE;
  30. }
  31. });
  32. producer.start();
  33. String msg = "Hello Transaction Message!";
  34. Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
  35. TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
  36. TimeUnit.SECONDS.sleep(2);
  37. System.out.println(transactionSendResult);
  38. System.out.println("发送完成!");
  39. }
  40. }

消息的过滤

在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

支持两种方式:Tag标签过滤和SQL属性过滤。

  1. Message message = new Message();
  2. message.setTopic("topic11");
  3. message.setTags("tag");
  4. message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
  5. message.putUserProperty("name", "zhangsan");
  6. message.putUserProperty("age", "16");
  7. SendResult result = producer.send(message);

subscribe方法subExpression参数也支持Tag过滤

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
  2. consumer.setConsumerGroup("group1");
  3. consumer.setNamesrvAddr("112.74.125.184:9876");
  4. consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));

SpringBoot整合RocketMQ的使用

在SpringBoot项目中主要通过RocketMQTemplate进行消息的发送。

  1. // 普通消息
  2. rocketMQTemplate.convertAndSend("topic10", user);
  3. rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
  4. SendResult result = rocketMQTemplate.syncSend("topic10", user);
  5. // 异步消息
  6. rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
  7. @Override
  8. public void onSuccess(SendResult sendResult) {
  9. System.out.println("成功!");
  10. }
  11. @Override
  12. public void onException(Throwable e) {
  13. System.out.println(e);
  14. }
  15. }, 1000L);
  16. // 单向消息
  17. rocketMQTemplate.sendOneWay("topic10", user);
  18. // 延时消息
  19. rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3);
  20. // 批量消息
  21. rocketMQTemplate.syncSend("topic10", list, 1000);

消费者:在注解中可以实现根据Tag和SQL进行属性的过滤。

  1. @Service
  2. //@RocketMQMessageListener(
  3. // consumerGroup = "group1",
  4. // topic = "topic10",
  5. // selectorExpression = "tag1 || tag2"
  6. //)
  7. @RocketMQMessageListener(
  8. consumerGroup = "group1",
  9. topic = "topic10",
  10. selectorType = SelectorType.SQL92,
  11. selectorExpression = "age > 16",
  12. messageModel = MessageModel.BROADCASTING
  13. )
  14. public class UserConsumer implements RocketMQListener<User> {
  15. @Override
  16. public void onMessage(User message) {
  17. }
  18. }

总结

今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及。

原文链接:https://www.cnblogs.com/Leo-coding/p/17367503.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号