经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
Redis+Hbase+RocketMQ 实际使用问题案例分享
来源:cnblogs  作者:王德发!  时间:2023/1/20 8:40:21  对本文有异议

需求

  1. 将Hbase数据,解析后推送到RocketMQ。
  2. redis使用list数据类型,存储了需要推送的数据的RowKey及表名。

简单画个流程图就是:

分析及确定方案

Redis

  1. 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;
  2. 一次取多个元素加快效率;
  3. 取了之后放入重试队列,并删除原来的元素;
  4. 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
  5. 明确从list中取值所使用的redis命令;范围获取LRANGE;范围删除(留下指定范围的数据)LTRIM;判断list长度LLEN;加入listRPUSH;删除LREM等等;
  6. 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
  7. 每次碰到重试次数不为0的数据都休眠1s;
  8. 设置最大重试次数,达到限制后丢弃;
  9. 考虑客户redis部署方式,单机、主从、集群、哨兵等;
  10. 选择合适的客户端,Jedis、Redisson、Lettuce等;
  11. 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

  1. 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;当时看这个觉得不错https://juejin.cn/post/6844903797655863309
  2. 因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
  3. 确定批量获取数据方式为批量Get,没用scan
  4. 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;
  5. 考虑所有都没数据时休眠10s;

RocketMQ

  1. 有现成的发送代码,公司封装好的;
  2. 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
  3. 调整服务端的内存、线程数等参数;

实现

配置

  1. #server configuration
  2. server.port=8896
  3. #log config
  4. logging.file.path=./logs
  5. #redis-standalone
  6. redis.standalone.host=
  7. redis.standalone.port=6379
  8. redis.standalone.password=
  9. redis.standalone.enable=true
  10. #redis-cluster
  11. redis.cluster.nodes=
  12. redis.cluster.password=
  13. redis.cluster.timeout=30000
  14. redis.cluster.enable=false
  15. # Zookeeper 集群地址,逗号分隔
  16. hbase.zookeeper.quorum=
  17. # Zookeeper 端口
  18. hbase.zookeeper.property.clientPort=2181
  19. # 消息目的rocketmq地址
  20. rocketmq.server.host=
  21. # 发送消息间隔时间,防止发送过快mq受不了
  22. rocketmq.send.interval.millisec=10
  23. # 每次从redis读取数据量限制。
  24. data.access.redisDataSize=100
  25. # 失败数据重试次数,超过的直接丢弃
  26. data.access.retryNum=10
  27. # 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
  28. data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
  29. data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分代码

获取配置,其余的直接@Value("${}")

  1. @Setter
  2. @Getter
  3. @Configuration
  4. @ConfigurationProperties(prefix = "data.access")
  5. public class AccessRedisMqConfig {
  6. /**
  7. * key:topic; value:redis的key
  8. */
  9. private Map<String, String> topicKeyMap = new HashMap<>();
  10. /**
  11. * 一次从redis中读取数据量限制
  12. */
  13. private long redisDataSize = 50;
  14. /**
  15. * 失败数据重试次数
  16. */
  17. private int retryNum = 10;
  18. }

开启接入:

  1. @Component
  2. public class AdapterRunner implements ApplicationRunner {
  3. @Resource
  4. private DataAccessService dataAccessService;
  5. @Override
  6. public void run(ApplicationArguments args) {
  7. System.out.println("项目已启动,开始接入数据到RocketMQ……");
  8. dataAccessService.accessData2Mq();
  9. }
  10. }

其他代码其实也在分析里了。

踩坑

  1. mq发送问题
  1. org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
  2. at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
  3. at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
  4. at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
  5. at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
  6. at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
  7. at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
  8. at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
  9. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  10. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  11. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  12. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  13. at java.base/java.lang.Thread.run(Thread.java:834)

上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。

总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。

原文链接:https://www.cnblogs.com/letscrazy/p/case.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号