经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » InfluxDB » 查看文章
Springboot使用influxDB时序数据库的实现
来源:jb51  时间:2021/8/4 11:26:55  对本文有异议

项目中需要存放大量设备日志,且需要对其进行简单的数据分析,信息提取工作.

结合众多考量因素,项目决定使用时序数据库中的领头羊InfluxDB.

引入依赖

项目中使用influxdb-java,在pom文件中添加如下依赖(github地址:https://github.com/influxdata/influxdb-java):

  1. <dependency>
  2. <groupId>org.influxdb</groupId>
  3. <artifactId>influxdb-java</artifactId>
  4. <version>2.15</version>
  5. </dependency>

application.yaml文件配置如下所示(请按照实际情况填写):

  1. spring:
  2. influx:
  3. url: *
  4. password: admin
  5. user: 123
  6. database: log_management

配置

(1) 创建配置类

  1. @Configuration
  2. public class InfluxDbConfig {
  3.  
  4. @Value("${spring.influx.url:''}")
  5. private String influxDBUrl;
  6.  
  7. @Value("${spring.influx.user:''}")
  8. private String userName;
  9.  
  10. @Value("${spring.influx.password:''}")
  11. private String password;
  12.  
  13. @Value("${spring.influx.database:''}")
  14. private String database;
  15.  
  16. @Bean
  17. public InfluxDbUtils influxDbUtils() {
  18. return new InfluxDbUtils(userName, password, influxDBUrl, database, "");
  19. }
  20. }
  1. @Data
  2. public class InfluxDbUtils {
  3. private String userName;
  4. private String password;
  5. private String url;
  6. public String database;
  7. private String retentionPolicy;
  8. // InfluxDB实例
  9. private InfluxDB influxDB;
  10.  
  11. // 数据保存策略
  12. public static String policyNamePix = "logRetentionPolicy_";
  13.  
  14. public InfluxDbUtils(String userName, String password, String url, String database,
  15. String retentionPolicy) {
  16. this.userName = userName;
  17. this.password = password;
  18. this.url = url;
  19. this.database = database;
  20. this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
  21. this.influxDB = influxDbBuild();
  22. }
  23.  
  24. /**
  25. * 连接数据库 ,若不存在则创建
  26. *
  27. * @return influxDb实例
  28. */
  29. private InfluxDB influxDbBuild() {
  30. if (influxDB == null) {
  31. influxDB = InfluxDBFactory.connect(url, userName, password);
  32. }
  33. try {
  34. createDB(database);
  35. influxDB.setDatabase(database);
  36. } catch (Exception e) {
  37. log.error("create influx db failed, error: {}", e.getMessage());
  38. } finally {
  39. influxDB.setRetentionPolicy(retentionPolicy);
  40. }
  41. influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
  42. return influxDB;
  43. }
  44. }

构建实体类

InfluxDB中,measurement对应于传统关系型数据库中的table(database为配置文件中的log_management).
InfluxDB里存储的数据称为时间序列数据,时序数据有零个或多个数据点.
数据点包括time(一个时间戳),measurement(例如logInfo),零个或多个tag,其对应于level,module,device_id),至少一个field(即日志内容,msg=something error).
InfluxDB会根据tag数值建立时间序列(因此tag数值不能选取诸如UUID作为特征值,易导致时间序列过多,导致InfluxDB崩溃),并建立相应索引,以便优化诸如查询速度.

  1. @Builder
  2. @Data
  3. @Measurement(name = "logInfo")
  4. public class LogInfo {
  5.  
  6. // Column中的name为measurement中的列名
  7. // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
  8. @Column(name = "time")
  9. private String time;
  10. // 注解中添加tag = true,表示当前字段内容为tag内容
  11. @Column(name = "module", tag = true)
  12. private String module;
  13. @Column(name = "level", tag = true)
  14. private String level;
  15. @Column(name = "device_id", tag = true)
  16. private String deviceId;
  17. @Column(name = "msg")
  18. private String msg;
  19. }
  20.  

保存数据

以下代码为单条日志保存,influxdb-java亦支持批量保存(因为与InfluxDB通讯均是通过http,因此建议批量保存以减少性能损耗).

  1. LogInfo logInfo = LogInfo.builder()
  2. .level(jsonObject.getString("level"))
  3. .module(module)
  4. .deviceId(deviceId)
  5. .msg(jsonObject.getString("msg"))
  6. .build();
  7. Point point = Point.measurementByPOJO(logInfo.getClass())
  8. .addFieldsFromPOJO(logInfo)
  9. .time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS)
  10. .build();
  11. // 出于业务考量,设备可以设置不同的保存策略(策略名为固定前缀+设备ID)
  12. influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);

查询数据

因为代码与业务耦合比较厉害,因此此处仅截选做概要示范.

  1. // InfluxDB支持分页查询,因此可以设置分页查询条件
  2. String pageQuery = " LIMIT " + request.getPageSize() + " OFFSET " + ((request.getPageNum() - 1) * request.getPageSize());
  3. // 此处查询所有内容,如果
  4. String queryCmd = "SELECT * FROM "
  5. // 查询指定设备下的日志信息
  6. // 要指定从 RetentionPolicyName(保存策略前缀+设备ID).measurement(logInfo) 中查询指定数据)
  7. + InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo"
  8. // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
  9. + queryCondition
  10. // 查询结果需要按照时间排序
  11. + " ORDER BY time DESC"
  12. // 添加分页查询条件
  13. + pageQuery;

选择时序数据库,不建议使用删除以及更新操作,因此不做介绍.

可以通过创建或者RetentionPolicy,来添加或者更新数据的删除时间.

到此这篇关于Springboot使用influxDB时序数据库的实现的文章就介绍到这了,更多相关Springboot使用influxDB内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号