经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MyBatis » 查看文章
一种轻量分表方案-MyBatis拦截器分表实践
来源:cnblogs  作者:京东云开发者  时间:2024/1/26 10:35:27  对本文有异议

背景

部门内有一些亿级别核心业务表增速非常快,增量日均100W,但线上业务只依赖近一周的数据。随着数据量的迅速增长,慢SQL频发,数据库性能下降,系统稳定性受到严重影响。本篇文章,将分享如何使用MyBatis拦截器低成本的提升数据库稳定性。

 

业界常见方案

针对冷数据多的大表,常用的策略有以2种:

1. 删除/归档旧数据。

2. 分表。

 

归档/删除旧数据

定期将冷数据移动到归档表或者冷存储中,或定期对表进行删除,以减少表的大小。此策略逻辑简单,只需要编写一个JOB定期执行SQL删除数据。我们开始也是用这种方案,但此方案也有一些副作用:

1.数据删除会影响数据库性能,引发慢sql,多张表并行删除,数据库压力会更大。
2.频繁删除数据,会产生数据库碎片,影响数据库性能,引发慢SQL。

综上,此方案有一定风险,为了规避这种风险,我们决定采用另一种方案:分表。

 

分表

我们决定按日期对表进行横向拆分,实现让系统每周生成一张周期表,表内只存近一周的数据,规避单表过大带来的风险。

 

分表方案选型

经调研,考虑2种分表方案:Sharding-JDBC、利用Mybatis自带的拦截器特性。

经过对比后,决定采用Mybatis拦截器来实现分表,原因如下:

1.JAVA生态中很常用的分表框架是Sharding-JDBC,虽然功能强大,但需要一定的接入成本,并且很多功能暂时用不上。
2.系统本身已经在使用Mybatis了,只需要添加一个mybaits拦截器,把SQL表名替换为新的周期表就可以了,没有接入新框架的成本,开发成本也不高。

 

 

简易架构图

 

分表具体实现代码

分表配置对象

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. import java.util.Date;
  5. @Data
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. public class ShardingProperty {
  9. // 分表周期天数,配置7,就是一周一分
  10. private Integer days;
  11. // 分表开始日期,需要用这个日期计算周期表名
  12. private Date beginDate;
  13. // 需要分表的表名
  14. private String tableName;
  15. }

分表配置类

  1. import java.util.concurrent.ConcurrentHashMap;
  2. public class ShardingPropertyConfig {
  3. public static final ConcurrentHashMap<String, ShardingProperty> SHARDING_TABLE ();
  4. static {
  5. ShardingProperty orderInfoShardingConfig = new ShardingProperty(15, DateUtils.string2Date("20231117"), "order_info");
  6. ShardingProperty userInfoShardingConfig = new ShardingProperty(7, DateUtils.string2Date("20231117"), "user_info");
  7. SHARDING_TABLE.put(orderInfoShardingConfig.getTableName(), orderInfoShardingConfig);
  8. SHARDING_TABLE.put(userInfoShardingConfig.getTableName(), userInfoShardingConfig);
  9. }
  10. }

拦截器

  1. import lombok.extern.slf4j.Slf4j;
  2. import o2o.aspect.platform.function.template.service.TemplateMatchService;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.ibatis.executor.statement.StatementHandler;
  5. import org.apache.ibatis.mapping.BoundSql;
  6. import org.apache.ibatis.mapping.MappedStatement;
  7. import org.apache.ibatis.plugin.*;
  8. import org.apache.ibatis.reflection.DefaultReflectorFactory;
  9. import org.apache.ibatis.reflection.MetaObject;
  10. import org.apache.ibatis.reflection.ReflectorFactory;
  11. import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
  12. import org.apache.ibatis.reflection.factory.ObjectFactory;
  13. import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
  14. import org.apache.ibatis.reflection.wrapper.ObjectWrapperFactory;
  15. import org.springframework.stereotype.Component;
  16. import java.sql.Connection;
  17. import java.time.LocalDateTime;
  18. import java.time.format.DateTimeFormatter;
  19. import java.util.Date;
  20. import java.util.Properties;
  21. @Slf4j
  22. @Component
  23. @Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
  24. public class ShardingTableInterceptor implements Interceptor {
  25. private static final ObjectFactory DEFAULT_OBJECT_FACTORY = new DefaultObjectFactory();
  26. private static final ObjectWrapperFactory DEFAULT_OBJECT_WRAPPER_FACTORY = new DefaultObjectWrapperFactory();
  27. private static final ReflectorFactory DEFAULT_REFLECTOR_FACTORY = new DefaultReflectorFactory();
  28. private static final String MAPPED_STATEMENT = "delegate.mappedStatement";
  29. private static final String BOUND_SQL = "delegate.boundSql";
  30. private static final String ORIGIN_BOUND_SQL = "delegate.boundSql.sql";
  31. private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
  32. private static final String SHARDING_MAPPER = "com.jd.o2o.inviter.promote.mapper.ShardingMapper";
  33. private ConfigUtils configUtils = SpringContextHolder.getBean(ConfigUtils.class);
  34. @Override
  35. public Object intercept(Invocation invocation) throws Throwable {
  36. boolean shardingSwitch = configUtils.getBool("sharding_switch", false);
  37. // 没开启分表 直接返回老数据
  38. if (!shardingSwitch) {
  39. return invocation.proceed();
  40. }
  41. StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
  42. MetaObject metaStatementHandler = MetaObject.forObject(statementHandler, DEFAULT_OBJECT_FACTORY, DEFAULT_OBJECT_WRAPPER_FACTORY, DEFAULT_REFLECTOR_FACTORY);
  43. MappedStatement mappedStatement = (MappedStatement) metaStatementHandler.getValue(MAPPED_STATEMENT);
  44. BoundSql boundSql = (BoundSql) metaStatementHandler.getValue(BOUND_SQL);
  45. String originSql = (String) metaStatementHandler.getValue(ORIGIN_BOUND_SQL);
  46. if (StringUtils.isBlank(originSql)) {
  47. return invocation.proceed();
  48. }
  49. // 获取表名
  50. String tableName = TemplateMatchService.matchTableName(boundSql.getSql().trim());
  51. ShardingProperty shardingProperty = ShardingPropertyConfig.SHARDING_TABLE.get(tableName);
  52. if (shardingProperty == null) {
  53. return invocation.proceed();
  54. }
  55. // 新表
  56. String shardingTable = getCurrentShardingTable(shardingProperty, new Date());
  57. String rebuildSql = boundSql.getSql().replace(shardingProperty.getTableName(), shardingTable);
  58. metaStatementHandler.setValue(ORIGIN_BOUND_SQL, rebuildSql);
  59. if (log.isDebugEnabled()) {
  60. log.info("rebuildSQL -> {}", rebuildSql);
  61. }
  62. return invocation.proceed();
  63. }
  64. @Override
  65. public Object plugin(Object target) {
  66. if (target instanceof StatementHandler) {
  67. return Plugin.wrap(target, this);
  68. }
  69. return target;
  70. }
  71. @Override
  72. public void setProperties(Properties properties) {}
  73. public static String getCurrentShardingTable(ShardingProperty shardingProperty, Date createTime) {
  74. String tableName = shardingProperty.getTableName();
  75. Integer days = shardingProperty.getDays();
  76. Date beginDate = shardingProperty.getBeginDate();
  77. Date date;
  78. if (createTime == null) {
  79. date = new Date();
  80. } else {
  81. date = createTime;
  82. }
  83. if (date.before(beginDate)) {
  84. return null;
  85. }
  86. LocalDateTime targetDate = SimpleDateFormatUtils.convertDateToLocalDateTime(date);
  87. LocalDateTime startDate = SimpleDateFormatUtils.convertDateToLocalDateTime(beginDate);
  88. LocalDateTime intervalStartDate = DateIntervalChecker.getIntervalStartDate(targetDate, startDate, days);
  89. LocalDateTime intervalEndDate = intervalStartDate.plusDays(days - 1);
  90. return tableName + "_" + intervalStartDate.format(FORMATTER) + "_" + intervalEndDate.format(FORMATTER);
  91. }
  92. }

临界点数据不连续问题

分表方案有1个难点需要解决:周期临界点数据不连续。举例:假设要对operate_log(操作日志表)大表进行横向分表,每周一张表,分表明细可看下面表格。

第一周(operate_log_20240107_20240108) 第二周(operate_log_20240108_20240114) 第三周(operate_log_20240115_20240121)
1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据 1月15号 ~ 1月21号的数据

1月8号就是分表临界点,8号需要切换到第二周的表,但8号0点刚切换的时候,表内没有任何数据,这时如果业务需要查近一周的操作日志是查不到的,这样就会引发线上问题。

我决定采用数据冗余的方式来解决这个痛点。每个周期表都冗余一份上个周期的数据,用双倍数据量实现数据滑动的效果,效果见下面表格。

第一周(operate_log_20240107_20240108) 第二周(operate_log_20240108_20240114) 第三周(operate_log_20240115_20240121)
12月25号 ~ 12月31号的数据 1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据
1月1号 ~ 1月7号的数据 1月8号 ~ 1月14号的数据 1月15号 ~ 1月21号的数据

注:表格内第一行数据就是冗余的上个周期表的数据。

思路有了,接下来就要考虑怎么实现双写(数据冗余到下个周期表),有2种方案:

1.在SQL执行完成返回结果前添加逻辑(可以用AspectJ 或 mybatis拦截器),如果SQL内的表名是当前周期表,就把表名替换为下个周期表,然后再次执行SQL。此方案对业务影响大,相当于串行执行了2次SQL,有性能损耗。
2.监听增量binlog,京东内部有现成的数据订阅中间件DRC,读者也可以使用cannal等开源中间件来代替DRC,原理大同小异,此方案对业务无影响。

方案对比后,选择了对业务性能损耗小的方案二。

 

监听binlog并双写流程图

 

 

监听binlog数据双写注意点

1.提前上线监听程序,提前把老表数据同步到新的周期表。分表前只监听老表binlog就可以,分表前只需要把老表数据同步到新表。
2.切换到新表的临界点,为了避免丢失积压的老表binlog,需要同时处理新表binlog和老表binlog,这样会出现死循环同步的问题,因为老表需要同步新表,新表又需要双写老表。为了打破循环,需要先把双写老表消费堵上让消息暂时积压,切换新表成功后,再打开双写消费。

 

监听binlog数据双写代码

注:下面代码不能直接用,只提供基本思路

  1. /**
  2. * 监听binlog ,分表双写,解决数据临界问题
  3. */
  4. @Slf4j
  5. @Component
  6. public class BinLogConsumer implements MessageListener {
  7. private MessageDeserialize deserialize = new JMQMessageDeserialize();
  8. private static final String TABLE_PLACEHOLDER = "%TABLE%";
  9. @Value("${mq.doubleWriteTopic.topic}")
  10. private String doubleWriteTopic;
  11. @Autowired
  12. private JmqProducerService jmqProducerService;
  13. @Override
  14. public void onMessage(List<Message> messages) throws Exception {
  15. if (messages == null || messages.isEmpty()) {
  16. return;
  17. }
  18. List<EntryMessage> entryMessages = deserialize.deserialize(messages);
  19. for (EntryMessage entryMessage : entryMessages) {
  20. try {
  21. syncData(entryMessage);
  22. } catch (Exception e) {
  23. log.error("sharding sync data error", e);
  24. throw e;
  25. }
  26. }
  27. }
  28. private void syncData(EntryMessage entryMessage) throws JMQException {
  29. // 根据binlog内的表名,获取需要同步的表
  30. // 3种情况:
  31. // 1、老表:需要同步当前周期表,和下个周期表。
  32. // 2、当前周期表:需要同步下个周期表,和老表。
  33. // 3、下个周期表:不需要同步。
  34. List<String> syncTables = getSyncTables(entryMessage.tableName, entryMessage.createTime);
  35. if (CollectionUtils.isEmpty(syncTables)) {
  36. log.info("table {} is not need sync", tableName);
  37. return;
  38. }
  39. if (entryMessage.getHeader().getEventType() == WaveEntry.EventType.INSERT) {
  40. String insertTableSqlTemplate = parseSqlForInsert(rowData);
  41. for (String syncTable : syncTables) {
  42. String insertSql = insertTableSqlTemplate.replaceAll(TABLE_PLACEHOLDER, syncTable);
  43. // 双写老表发Q,为了避免出现同步死循环问题
  44. if (ShardingPropertyConfig.SHARDING_TABLE.containsKey(syncTable)) {
  45. Long primaryKey = getPrimaryKey(rowData.getAfterColumnsList());
  46. sendDoubleWriteMsg(insertSql, primaryKey);
  47. continue;
  48. }
  49. mysqlConnection.executeSql(insertSql);
  50. }
  51. continue;
  52. }
  53. }

数据对比

为了保证新表和老表数据一致,需要编写对比程序,在上线前进行数据对比,保证binlog同步无问题。

具体实现代码不做展示,思路:新表查询一定量级数据,老表查询相同量级数据,都转换成JSON,equals对比。

作者:京东零售 张均杰

来源:京东云开发者社区 转载请注明来源

原文链接:https://www.cnblogs.com/Jcloud/p/17988807

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号