0,定义esHighClient
- 1 @Configuration
- 2 public class RestClientConfig {
- 3
- 4 //类似:200.188.22.20:9300,200.188.22.21:9300
- 5 @Value("${spring.elasticsearch.rest.hosts}")
- 6 private String hosts;
- 7
- 8
- 9 @Bean
- 10 public RestHighLevelClient esRestHighLevelClient() {
- 11
- 12 HttpHost[] httpHosts = Arrays.stream(hosts.split(",")).map(x -> {
- 13 String[] hostInfo = x.split(":");
- 14 return new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
- 15 }).toArray(HttpHost[]::new);
- 16
- 17 RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).
- 18 setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30000)
- 19 .setSocketTimeout(30000).setConnectionRequestTimeout(10000));
- 20
- 21 return new RestHighLevelClient(restClientBuilder);
- 22 }
- 23 }
-
1,定义涉及到nested类型的mapping
- 1 CreateIndexRequest request = new CreateIndexRequest("你定义的index名字");
- 2 //创建index
- 3 CreateIndexResponse response = esRestHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
- 4 PutMappingRequest request = new PutMappingRequest(index);
- 5 //由于es7以上版本无type的概念,本人学术不精,此字段强制设置type,只能用默认的_doc填入其中
- 6 request.type("_doc");
- 7 //定义刚刚创建的index的 mapping的json结构
- 8 request.source("{\"properties\":{\"deviceId\":{\"type\":\"keyword\"},\"indicatorId\":{\"type\":\"keyword\"},\"instanceName\":{\"type\":\"keyword\"},\"substance\":{\"type\":\"nested\",\"properties\":{\"value\":{\"type\":\"double\"},\"collectTime\":{\"type\":\"date\"},\"receiveTime\":{\"type\":\"date\"},\"dbTime\":{\"type\":\"date\"}}}}}", XContentType.JSON);
- 9 AcknowledgedResponse response = esRestHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
2,定义mapping对应的Java对象
- 1 @Data
- 2 public class NestSeries {
- 3
- 4 //设备ID
- 5 @Field(type = FieldType.Keyword)
- 6 // @JsonProperty("device_id")
- 7 private String deviceId;
- 8 //监控项ID
- 9 @Field(type = FieldType.Keyword)
- 10 private String indicatorId;
- 11
- 12 //监控实例,eg:C盘,D盘
- 13 @Field(type = FieldType.Text)
- 14 private String instanceName;
- 15
- 16
- 17 //监控项当前性能值
- 18 @Field(type = FieldType.Nested)
- 19 private List<SubStanceSeries> substance;
- 20
- 21 }
3,定义mapping中nested子对象
- @Data
- public class SubStanceSeries {
- //监控项当前性能值
- @Field(type = FieldType.Double)
- private Double value;
- //监控项采集前时间
- @Field(type = FieldType.Date)
- private long collectTime;
- //监控项采集后时间
- @Field(type = FieldType.Date)
- private long receiveTime;
- //监控项入库时间
- @Field(type = FieldType.Date)
- private long dbTime;
- }
4,定义对象转map的工具类(FasterJsonUtil同)
- 1 @Slf4j
- 2 public class JsonUtil {
- 3 private static ObjectMapper mapper = new ObjectMapper();
- 4 public static Map<String, Object> convertValueToMap(Object data) {
- 5 return mapper.convertValue(data, new TypeReference<Map<String, Object>>() {
- 6 });
- 7 }
- 8 }
5,判断此条数据是否存在,存在则更新,不存在则新增
- 1 private boolean existSeries(PerformanceDo p, String indexName) {
- 2
- 3 try {
- 4 BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
- 5 Es7Util.addOptionalMustQuery(queryBuilder, "deviceId", p.getDeviceId());
- 6 Es7Util.addOptionalMustQuery(queryBuilder, "indicatorId", p.getIndicatorId());
- 7 if (StringUtils.isNotBlank(p.getInstanceName())) {
- 8 Es7Util.addOptionalMustQuery(queryBuilder, "instanceName", p.getInstanceName());
- 9 }
- 10
- 11 //通过client方式去查询
- 12 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- 13 sourceBuilder.query(queryBuilder);
- 14 //索引 各种组合条件
- 15 SearchRequest rq = new SearchRequest().indices(indexName).source(sourceBuilder);
- 16
- 17 SearchResponse rp = esRestHighLevelClient.search(rq, RequestOptions.DEFAULT);
- 18
- 19 //通过client方式去查询
- 20 if (rp.status() == RestStatus.OK && rp.getHits().getHits().length > 0) {
- 21 //说明存在记录
- 22 return true;
- 23 }
- 24 } catch (Exception e) {
- 25 log.error("查询series是否存在当前nest异常:",e);
- 26 }
- 27
- 28 return false;
- 29 }
6,新增一个document(类似于mysql插入一行数据)
- 1 //新增性能数据存储 新增操作
- 2 private void addNestSeries(PerformanceDo p,String indexName) {
- 3
- 4 try {
- 5 NestSeries series = new NestSeries();
- 6 series.setDeviceId(p.getDeviceId());
- 7 series.setIndicatorId(p.getIndicatorId());
- 8 series.setInstanceName(p.getInstanceName());
- 9 List<SubStanceSeries> subStanceSeriesList = new ArrayList<>();
- 10 SubStanceSeries subStanceSeries = new SubStanceSeries();
- 11
- 12 if (StringUtils.isNotBlank(p.getValue())) {
- 13 subStanceSeries.setValue(Double.valueOf(p.getValue()));
- 14 }
- 15
- 16 subStanceSeries.setCollectTime(p.getCollectTime());
- 17 subStanceSeries.setReceiveTime(p.getReceiveTime());
- 18 subStanceSeries.setDbTime(p.getDbTime());
- 19 subStanceSeriesList.add(subStanceSeries);
- 20 series.setSubstance(subStanceSeriesList);
- 21
- 22 IndexRequest indexRequest = new IndexRequest(indexName, TYPE).source(FasterJsonUtil.convertValueToMap(series), XContentType.JSON);
- 23
- 24 esRestHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- 25 } catch (Exception e) {
- 26 log.error("新增series的nest异常:",e);
- 27 }
- 28
- 29 }
7,局部更新nested类型内部数据
- 1 //追加性能数据存储 更新操作
- 2 private void uptNestSeries(PerformanceDo p, String indexName) {
- 3
- 4 try {
- 5 UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
- 6
- 7 BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
- 8 Es7Util.addOptionalMustQuery(queryBuilder, "deviceId", p.getDeviceId());
- 9 Es7Util.addOptionalMustQuery(queryBuilder, "indicatorId", p.getIndicatorId());
- 10 if (StringUtils.isNotBlank(p.getInstanceName())) {
- 11 Es7Util.addOptionalMustQuery(queryBuilder, "instanceName", p.getInstanceName());
- 12 }
- 13
- 14 SubStanceSeries subStanceSeries = new SubStanceSeries();
- 15
- 16 if (StringUtils.isNotBlank(p.getValue())) {
- 17 subStanceSeries.setValue(Double.valueOf(p.getValue()));
- 18 }
- 19
- 20 subStanceSeries.setCollectTime(p.getCollectTime());
- 21 subStanceSeries.setReceiveTime(p.getReceiveTime());
- 22 subStanceSeries.setDbTime(p.getDbTime());
- 23
- 24
- 25 //通过查询 指定要更新的_id
- 26 updateByQueryRequest.setQuery(queryBuilder);
- 27 Map<String, Object> paramMap = new HashMap<>();
- 28 paramMap.put("detail", convertValueToMap(subStanceSeries));
- 29
- 30 //使用painless脚本 进行复杂结构的更新 我演示的是去更新嵌套类型comments 这里是添加comments
- 31 updateByQueryRequest.setScript(new Script(ScriptType.INLINE, "painless", "ctx._source.substance.add(params.detail)", paramMap));
- 32
- 33 esRestHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
- 34
- 35 } catch (Exception e) {
- 36 log.error("更新series的nest异常:",e);
- 37 }
- 38 }
对应dml如下:
- 1 POST /indexname20201118/_update_by_query
- 2 {
- 3 "script": {
- 4 "source": "ctx._source.substance.add(params.detail)",
- 5 "lang": "painless",
- 6 "params": {
- 7 "detail": {
- 8 "value": 4,
- 9 "collectTime": 4405691110000,
- 10 "receiveTime": 4405691110000,
- 11 "dbTime": 4405691110000
- 12
- 13 }
- 14 }
- 15 },
- 16 "query": {
- 17 "bool": {
- 18 "must": [
- 19 {
- 20 "match_phrase": {
- 21 "deviceId": "1212121-f796-4e74-8007-929631212121"
- 22 }
- 23 },
- 24 {
- 25 "match_phrase": {
- 26 "indicatorId": "12121212SYS_USAGE"
- 27 }
- 28 },
- 29 {
- 30 "match_phrase": {
- 31 "instanceName": "/dev/mapper/1212121"
- 32 }
- 33 }
- 34 ]
- 35 }
- 36 }
- 37 }
8,实现结果展示如下:
- 1 {
- 2 "took": 1,
- 3 "timed_out": false,
- 4 "_shards": {
- 5 "total": 1,
- 6 "successful": 1,
- 7 "skipped": 0,
- 8 "failed": 0
- 9 },
- 10 "hits": {
- 11 "total": {
- 12 "value": 1,
- 13 "relation": "eq"
- 14 },
- 15 "max_score": 9.95262,
- 16 "hits": [
- 17 {
- 18 "_index": "indexname20201119",
- 19 "_type": "_doc",
- 20 "_id": "izhV33UBQMSXiw121212",
- 21 "_score": 9.95262,
- 22 "_source": {
- 23 "indicatorId": "LINUX_PR1212S_1212121",
- 24 "instanceName": "test",
- 25 "substance": [
- 26 {
- 27 "receiveTime": 121215692836100,
- 28 "collectTime": 11521292835000,
- 29 "dbTime": 1169821207832,
- 30 "value": 4
- 31 },
- 32 {
- 33 "receiveTime": 121215692896125,
- 34 "collectTime": 1121292895000,
- 35 "dbTime": 112121769904778,
- 36 "value": 4
- 37 },
- 38 {
- 39 "receiveTime": 16121212956112,
- 40 "collectTime": 1605692955000,
- 41 "dbTime": 1605121913140,
- 42 "value": 5
- 43 },
- 44 {
- 45 "receiveTime": 12121212,
- 46 "collectTime": 1601211215001,
- 47 "dbTime": 1212121,
- 48 "value": 4
- 49 },
- 50 {
- 51 "receiveTime": 121212176098,
- 52 "collectTime": 161212125001,
- 53 "dbTime":11215769930788,
- 54 "value": 6
- 55 },
- 56 {
- 57 "receiveTime": 12121136104,
- 58 "collectTime": 1212693135000,
- 59 "dbTime": 11212769939631,
- 60 "value": 7
- 61 },
- 62 {
- 63 "receiveTime": 12121296103,
- 64 "collectTime":2121193195000,
- 65 "dbTime": 1121219948989,
- 66 "value": 4
- 67 },
- 68 {
- 69 "receiveTime": 121212156141,
- 70 "collectTime": 12121293255000,
- 71 "dbTime": 121212571214,
- 72 "value": 4
- 73 },
- 74 {
- 75 "receiveTime": 1605693316109,
- 76 "collectTime": 1605693315001,
- 77 "dbTime": 1605769965907,
- 78 "value": 4
- 79 },
- 80 {
- 81 "receiveTime": 12121212115,
- 82 "collectTime": 1121212375001,
- 83 "dbTime": 1121219974723,
- 84 "value": 8
- 85 },
- 86 {
- 87 "receiveTime": 121212113,
- 88 "collectTime": 160212121435001,
- 89 "dbTime": 16051212183159,
- 90 "value": 4
- 91 },
- 92 {
- 93 "receiveTime": 1605693496107,
- 94 "collectTime": 1605693495000,
- 95 "dbTime": 1605769992147,
- 96 "value": 4
- 97 },
- 98 {
- 99 "receiveTime": 1605693556103,
- 100 "collectTime": 1605693555000,
- 101 "dbTime": 1605770001828,
- 102 "value": 4
- 103 }
- 104 ],
- 105 "deviceId": "121212126-4e74-121212123f5"
- 106 }
- 107 }
- 108 ]
- 109 }
- 110 }