经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 移动开发 » Kotlin » 查看文章
使用Kotlin+RocketMQ实现延时消息的示例代码
来源:jb51  时间:2019/7/5 8:57:17  对本文有异议

一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

  • 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。
  • 在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

  • 轮询遍历数据库记录
  • JDK 的 DelayQueue
  • ScheduledExecutorService
  • 基于 Quartz 的定时任务
  • 基于 Redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。

三. RocketMQ 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(Producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 AbstractProducer。

  1. abstract class AbstractProducer :ProducerBean() {
  2. var producerId: String? = null
  3. var topic: String? = null
  4. var tag: String?=null
  5. var timeoutMillis: Int? = null
  6. var delaySendTimeMills: Long? = null
  7.  
  8. val log = LogFactory.getLog(this.javaClass)
  9.  
  10. open fun sendMessage(messageBody: Any, tag: String) {
  11. val msgBody = JSON.toJSONString(messageBody)
  12. val message = Message(topic, tag, msgBody.toByteArray())
  13.  
  14. if (delaySendTimeMills != null) {
  15. val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
  16. message.startDeliverTime = startDeliverTime
  17. log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
  18. }
  19. val logMessageId = buildLogMessageId(message)
  20. try {
  21. val sendResult = send(message)
  22. log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
  23. } catch (e: Exception) {
  24. log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
  25. }
  26.  
  27. }
  28.  
  29. fun buildLogMessageId(message: Message): String {
  30. return "topic: " + message.topic + "\n" +
  31. "producer: " + producerId + "\n" +
  32. "tag: " + message.tag + "\n" +
  33. "key: " + message.key + "\n"
  34. }
  35. }
  36.  

根据业务需要,增加一个支持重试机制的 Producer

  1. @Component
  2. @ConfigurationProperties("mqs.ons.producers.xxx-producer")
  3. @Configuration
  4. @Data
  5. class CleanReportPushEventProducer :AbstractProducer() {
  6.  
  7. lateinit var delaySecondList:List<Long>
  8.  
  9. fun sendMessage(messageBody: CleanReportPushEventMessage){
  10. //重试超过次数之后不再发事件
  11. if (delaySecondList!=null) {
  12.  
  13. if(messageBody.times>=delaySecondList.size){
  14. return
  15. }
  16. val msgBody = JSON.toJSONString(messageBody)
  17. val message = Message(topic, tag, msgBody.toByteArray())
  18. val delayTimeMills = delaySecondList[messageBody.times]*1000L
  19. message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
  20. log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
  21. val logMessageId = buildLogMessageId(message)
  22. try {
  23. val sendResult = send(message)
  24. log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
  25. } catch (e: Exception) {
  26. log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
  27. }
  28. }
  29. }
  30. }
  31.  

在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。

通过 System.currentTimeMillis() + delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(

3.3 消费者(Consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 Push 类型的 AbstractConsumer:

  1. @Data
  2. abstract class AbstractConsumer ():MessageListener{
  3.  
  4. var consumerId: String? = null
  5.  
  6. lateinit var subscribeOptions: List<SubscribeOptions>
  7.  
  8. var threadNums: Int? = null
  9.  
  10. val log = LogFactory.getLog(this.javaClass)
  11.  
  12. override fun consume(message: Message, context: ConsumeContext): Action {
  13. val logMessageId = buildLogMessageId(message)
  14. val body = String(message.body)
  15. try {
  16. log.info(logMessageId + " body: " + body)
  17. val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
  18. log.info(logMessageId + " result: " + result.name)
  19. return result
  20. } catch (e: Exception) {
  21. if (message.reconsumeTimes >= 3) {
  22. log.error(logMessageId + " error: " + e.message, e)
  23. }
  24. return Action.ReconsumeLater
  25. }
  26.  
  27. }
  28.  
  29. abstract fun getMessageBodyType(tag: String): Type?
  30.  
  31. abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action
  32.  
  33. protected fun buildLogMessageId(message: Message): String {
  34. return "topic: " + message.topic + "\n" +
  35. "consumer: " + consumerId + "\n" +
  36. "tag: " + message.tag + "\n" +
  37. "key: " + message.key + "\n" +
  38. "MsgId:" + message.msgID + "\n" +
  39. "BornTimestamp" + message.bornTimestamp + "\n" +
  40. "StartDeliverTime:" + message.startDeliverTime + "\n" +
  41. "ReconsumeTimes:" + message.reconsumeTimes + "\n"
  42. }
  43. }
  44.  

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

  1. @Configuration
  2. @ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
  3. @Data
  4. class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {
  5.  
  6. val logger: Logger = LoggerFactory.getLogger(this.javaClass)
  7.  
  8. override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
  9. if(obj is CleanReportPushEventMessage){
  10. //清除事件
  11. logger.info("consumer clean-report event report_id:${obj.id} ")
  12.  
  13. //消费失败之后再发送一次消息
  14. if(!cleanReportService.sendCleanReportEvent(obj.id)){
  15. val times = obj.times+1
  16. eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
  17. }
  18. }
  19. return Action.CommitMessage
  20. }
  21.  
  22. override fun getMessageBodyType(tag: String): Type? {
  23. return CleanReportPushEventMessage::class.java
  24. }
  25. }
  26.  

其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 ConsumerFactory

  1. @Component
  2. class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {
  3.  
  4. val logger: Logger = LoggerFactory.getLogger(this.javaClass)
  5.  
  6.  
  7. @PostConstruct
  8. fun start() {
  9. CompletableFuture.runAsync{
  10. consumers.stream().forEach {
  11. val properties = buildProperties(it.consumerId!!, it.threadNums)
  12. val consumer = ONSFactory.createConsumer(properties)
  13. if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
  14. for (options in it.subscribeOptions!!) {
  15. consumer.subscribe(options.topic, options.tag, it)
  16. }
  17. consumer.start()
  18. val message = "\n".plus(
  19. it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
  20. .collect(Collectors.toList<Any>()))
  21. logger.info(String.format("consumer: %s\n", message))
  22. }
  23. }
  24. }
  25. }
  26.  
  27. private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
  28. val properties = Properties()
  29. properties.put(PropertyKeyConst.ConsumerId, consumerId)
  30. properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
  31. properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
  32. if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
  33. properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
  34. } else {
  35. // 测试环境接入RocketMQ
  36. properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
  37. }
  38. properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
  39. return properties
  40. }
  41. }
  42.  

四. 总结

正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号