经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Elasticsearch » 查看文章
利用Elasticsearch实现地理位置、城市搜索服务
来源:cnblogs  作者:下午喝什么茶  时间:2024/7/29 9:31:01  对本文有异议

最近用到一些简单的地理位置查询接口,基于当前定位获取用户所在位置信息(省市区),然后基于该信息查询当前区域的......提供服务。

然后就自己研究了下GIS,作为一个程序员。自己能不能实现这个功能呢?答案当然是可以。立即开干。

思路:找到数据,写入数据库,利用Elasticsearch强大的搜索能力和丰富发热GIS数据处理能力实现。

GIS相关专业信息参考(bd上找到,还算专业):程序员GIS入门|前后端都要懂一点的GIS知识

经过一番寻找,“功夫不负有心人”,在网上找到了锐多宝 数据,比较完整。下载下来,格式是shape格式。

第一步:下载数据,从锐多宝下载

 第二步:写python脚本预处理数据:ShapFile 转 GeoJSON,ES处理GeoJSON比较强

  1. import geopandas as gpd

    # 读取 Shapefile
    shapefile_path = 'D:/data/gis/2023年_CTAmap_1.12版/2023年省级/2023年省级.shp'
    gdf = gpd.read_file(shapefile_path)

    # 检查 GeoDataFrame
    print(gdf.head())

    # 如果需要,可以对数据进行预处理,比如过滤、选择特定列等
    # gdf = gdf[['column1', 'column2', 'geometry']]

    # 将 GeoDataFrame 转换为标准的 Pandas DataFrame (如果需要的话)
    df = gdf.drop('geometry', axis=1).join(gdf['geometry'].apply(lambda x: gpd.GeoSeries(x).to_json()))

    # 将 Pandas DataFrame 导出为 JSON 文件
    output_json_path = 'D:/data/gis/2023-province-GeoJSON.gesjson'
    # df.to_json(output_json_path, orient='records')

    # 如果你想保留 GeoJSON 格式,可以直接保存 GeoDataFrame
    gdf.to_file(output_json_path, driver='GeoJSON')

第三步:利用Python脚本将GeoJSON写入Elasticsearch

  1. from elasticsearch import Elasticsearch
  2. from elasticsearch.helpers import bulk
  3. import json
  4. # 连接到 Elasticsearch
  5. es = Elasticsearch("http://localhost:9200")
  6. # 检查连接
  7. if not es.ping():
  8. raise ValueError("Connection failed")
  9. # 删除旧索引(如果存在)
  10. if es.indices.exists(index="province2023_geoshape_index_001"):
  11. es.indices.delete(index="province2023_geoshape_index_001")
  12. # 创建索引并定义 Mapping
  13. mapping = {
  14. "mappings": {
  15. "properties": {
  16. "location": {
  17. "type": "geo_shape"
  18. },
  19. "name": {
  20. "type": "text"
  21. }
  22. }
  23. }
  24. }
  25. # 创建索引
  26. es.indices.create(index="province2023_geoshape_index_001", body=mapping)
  27. # 读取 GeoJSON 文件
  28. with open("D:/data/gis/2023-province-GeoJSON.gesjson", "r", encoding="utf-8") as file:
  29. geojson_data = json.load(file)
  30. # 提取 GeoJSON 特征集合
  31. features = geojson_data.get("features", [])
  32. # 准备数据以供导入
  33. documents = []
  34. for feature in features:
  35. doc = {
  36. "location": {
  37. "type": feature["geometry"]["type"],
  38. "coordinates": feature["geometry"]["coordinates"]
  39. }
  40. }
  41. if "properties" in feature:
  42. doc.update(feature["properties"])
  43. documents.append(doc)
  44. # 定义批量大小
  45. batch_size = 100 # 每次批量导入的数量
  46. # 准备 actions
  47. def generate_actions(documents):
  48. for doc in documents:
  49. yield {
  50. "_index": "province2023_geoshape_index_001",
  51. "_source": doc
  52. }
  53. # 分批执行批量导入
  54. for i in range(0, len(documents), batch_size):
  55. end = min(i + batch_size, len(documents))
  56. success, _ = bulk(es, generate_actions(documents[i:end]))
  57. print(f"Bulk {i}-{end} completed, {success} documents indexed.")
  58. print("All data indexed.")

第四步:计算出每条数据的区域的中心点(扩展功能,原始数据只有polygon多边形数据)

  1. from elasticsearch import Elasticsearch
  2. from elasticsearch.helpers import bulk
  3. import json
  4. import ssl
  5. # 连接到 Elasticsearch
  6. es = Elasticsearch("http://localhost:9200")
  7. # 检查连接
  8. if not es.ping():
  9. raise ValueError("Connection failed")
  10. # 删除旧索引(如果存在)
  11. if es.indices.exists(index="province2023_centroid_geoshape_index_001"):
  12. es.indices.delete(index="province2023_centroid_geoshape_index_001")
  13. # 创建索引并定义 Mapping
  14. mapping = {
  15. "mappings": {
  16. "properties": {
  17. "location": {
  18. "type": "geo_shape"
  19. },
  20. "centroid": { # 新增字段
  21. "type": "geo_point"
  22. },
  23. "name": {
  24. "type": "text"
  25. }
  26. }
  27. }
  28. }
  29. # 创建索引
  30. es.indices.create(index="province2023_centroid_geoshape_index_001", body=mapping)
  31. # 读取 GeoJSON 文件
  32. with open("D:/data/gis/2023-province-GeoJSON.gesjson", "r", encoding="utf-8") as file:
  33. geojson_data = json.load(file)
  34. # 提取 GeoJSON 特征集合
  35. features = geojson_data.get("features", [])
  36. def calculate_centroid(polygons):
  37. total_area = 0.0
  38. total_x = 0.0
  39. total_y = 0.0
  40.  
  41. for polygon in polygons:
  42. # 现在 polygon 是一个包含多个坐标的列表
  43. centroid = calculate_simple_polygon_centroid(polygon)
  44. area = calculate_polygon_area(polygon)
  45. total_area += area
  46. total_x += centroid[0] * area
  47. total_y += centroid[1] * area
  48. if total_area == 0:
  49. # 如果总面积为零,则返回原点作为中心点
  50. return [0, 0]
  51. else:
  52. return [total_x / total_area, total_y / total_area]
  53. # is_coordinates_list方法
  54. # 以下结构返回True,polygon 是一个包含坐标列表的列表
  55. # [
  56. # [[x1, y1], [x2, y2], [x3, y3], ...],
  57. # [[x1, y1], [x2, y2], [x3, y3], ...] # 如果有内部孔洞
  58. # ]
  59. # 以下结构返回Fasle,包含单个坐标的列表
  60. # [
  61. # [x1, y1],
  62. # [x2, y2],
  63. # [x3, y3],
  64. # ...
  65. # ]
  66. def is_coordinate(coord):
  67. return (
  68. isinstance(coord, (list, tuple)) and
  69. len(coord) == 2 and
  70. all(isinstance(c, (int, float)) for c in coord)
  71. )
  72. def is_coordinates_list(coords):
  73. # 检查 coords 是否是一个包含坐标列表的列表
  74. if isinstance(coords, list):
  75. if all(isinstance(c, list) and all(is_coordinate(coord) for coord in c) for c in coords):
  76. return True
  77. return False
  78. def calculate_simple_polygon_centroid(polygon):
  79. # 确定 polygon 的结构
  80. if is_coordinates_list(polygon):
  81. # polygon 是一个包含坐标列表的列表
  82. x_sum = sum(coord[0] for coord in polygon[0])
  83. y_sum = sum(coord[1] for coord in polygon[0])
  84. num_points = len(polygon[0])
  85. else:
  86. # print(False, polygon[0])
  87. # polygon 是一个包含多个坐标的列表
  88. x_sum = sum(coord[0] for coord in polygon)
  89. y_sum = sum(coord[1] for coord in polygon)
  90. num_points = len(polygon)
  91. # 计算平均坐标
  92. centroid_x = x_sum / num_points
  93. centroid_y = y_sum / num_points
  94. return [centroid_x, centroid_y]
  95. def calculate_polygon_area(polygon):
  96. # 计算简单多边形的面积
  97. area = 0.0
  98. if is_coordinates_list(polygon): # polygon 是一个包含坐标列表的列表
  99. num_points = len(polygon[0])
  100. for i in range(num_points):
  101. j = (i + 1) % num_points
  102. area += polygon[0][i][0] * polygon[0][j][1]
  103. area -= polygon[0][j][0] * polygon[0][i][1]
  104. else: # polygon 是一个包含多个坐标的列表
  105. num_points = len(polygon)
  106. for i in range(num_points):
  107. j = (i + 1) % num_points
  108. area += polygon[i][0] * polygon[j][1]
  109. area -= polygon[j][0] * polygon[i][1]
  110. return abs(area) / 2.0
  111. # 准备数据以供导入
  112. documents = []
  113. for feature in features:
  114. # 检查坐标是否在有效范围内
  115. coordinates = feature["geometry"]["coordinates"]
  116. centroid = calculate_centroid(coordinates)
  117. doc = {
  118. "location": {
  119. "type": feature["geometry"]["type"],
  120. "coordinates": coordinates
  121. },
  122. "centroid": centroid, # 添加中心点
  123. }
  124. if "properties" in feature:
  125. doc.update(feature["properties"])
  126. documents.append(doc)
  127. # 定义批量大小
  128. batch_size = 100 # 每次批量导入的数量
  129. # 准备 actions
  130. def generate_actions(documents):
  131. for doc in documents:
  132. yield {
  133. "_index": "district2023_centroid_geoshape_index_001",
  134. "_source": doc
  135. }
  136. # 分批执行批量导入
  137. for i in range(0, len(documents), batch_size):
  138. end = min(i + batch_size, len(documents))
  139. success, errors = bulk(es, generate_actions(documents[i:end]))
  140. if errors:
  141. print(f"Bulk {i}-{end} completed, {success} documents indexed, but {len(errors)} documents failed.")
  142. for error in errors:
  143. print(error)
  144. else:
  145. print(f"Bulk {i}-{end} completed, {success} documents indexed.")
  146. print("All data indexed.")

第五步:利用elasticsearch的pipeline和reindex能力预处理数据

  1. # geo_centroid 聚合是一种高级聚合,它可以计算一组地理位置的中心点。在 Elasticsearch 中,这个功能属于高级特性,通常只在 X-Pack(现在称为 Elastic Security 和 Elastic Observability)的许可证中可用。
  2. # 试用30天可以体验
  3. POST /province2023_geoshape_index_001/_search
  4. {
  5. "size": 0,
  6. "aggs": {
  7. "centroid": {
  8. "geo_centroid": {
  9. "field": "location"
  10. }
  11. }
  12. }
  13. }
  14. POST province2023_centroid_geoshape_index_001/_search
  15. {
  16. "query": {
  17. "term": {
  18. "省.keyword": {
  19. "value": "陕西省"
  20. }
  21. }
  22. }
  23. }
  24. PUT _ingest/pipeline/copy_field_pipeline
  25. {
  26. "description": "Copy the value of one field to another",
  27. "processors": [
  28. {
  29. "copy": {
  30. "from": "",
  31. "to": "province_name"
  32. }
  33. }
  34. ]
  35. }
  36. GET province2023_centroid_geoshape_index_001/_mapping
  37. GET province2023_centroid_geoshape_index_001/_mapping
  38. PUT _ingest/pipeline/province_multiple_copy_fields_pipeline
  39. {
  40. "description": "Copy multiple fields to new fields and rename fields to new fields",
  41. "processors": [
  42. {
  43. "set": {
  44. "field": "province_name",
  45. "value": "{{{省}}}"
  46. }
  47. },
  48. {
  49. "remove": {
  50. "field": ""
  51. }
  52. },
  53. {
  54. "rename": {
  55. "field": "省级码",
  56. "target_field": "province_code"
  57. }
  58. },
  59. {
  60. "rename": {
  61. "field": "省类型",
  62. "target_field": "province_type"
  63. }
  64. },
  65. {
  66. "rename": {
  67. "field": "VAR_NAME",
  68. "target_field": "var_name"
  69. }
  70. },
  71. {
  72. "rename": {
  73. "field": "ENG_NAME",
  74. "target_field": "eng_name"
  75. }
  76. },
  77. {
  78. "rename": {
  79. "field": "FIRST_GID",
  80. "target_field": "first_gid"
  81. }
  82. },
  83. {
  84. "rename": {
  85. "field": "FIRST_TYPE",
  86. "target_field": "first_type"
  87. }
  88. }
  89. ]
  90. }
  91. GET province2023_centroid_geoshape_index_002/_count
  92. GET province2023_centroid_geoshape_index_002/_mapping
  93. DELETE province2023_centroid_geoshape_index_002
  94. PUT province2023_centroid_geoshape_index_002
  95. {
  96. "mappings": {
  97. "properties": {
  98. "eng_name": {
  99. "type": "text",
  100. "fields": {
  101. "keyword": {
  102. "type": "keyword",
  103. "ignore_above": 256
  104. }
  105. }
  106. },
  107. "first_gid": {
  108. "type": "text",
  109. "fields": {
  110. "keyword": {
  111. "type": "keyword",
  112. "ignore_above": 256
  113. }
  114. }
  115. },
  116. "first_type": {
  117. "type": "text",
  118. "fields": {
  119. "keyword": {
  120. "type": "keyword",
  121. "ignore_above": 256
  122. }
  123. }
  124. },
  125. "var_name": {
  126. "type": "text",
  127. "fields": {
  128. "keyword": {
  129. "type": "keyword",
  130. "ignore_above": 256
  131. }
  132. }
  133. },
  134. "centroid": {
  135. "type": "geo_point"
  136. },
  137. "location": {
  138. "type": "geo_shape"
  139. },
  140. "name": {
  141. "type": "text"
  142. },
  143. "year": {
  144. "type": "text",
  145. "fields": {
  146. "keyword": {
  147. "type": "keyword",
  148. "ignore_above": 256
  149. }
  150. }
  151. }
  152. }
  153. }
  154. }
  155. POST _reindex
  156. {
  157. "source": {
  158. "index": "province2023_centroid_geoshape_index_001"
  159. },
  160. "dest": {
  161. "index": "province2023_centroid_geoshape_index_002",
  162. "pipeline": "province_multiple_copy_fields_pipeline"
  163. }
  164. }
  165. GET province2023_centroid_geoshape_index_002/_search

第六步:查询数据 geo_distance

  1. # centroid字段的type是 geo_point,存储的经纬度形式是数组Geopoint as an array
  2. # geo_bounding_box 可查找边框内的所有地理坐标点。
  3. POST province2023_centroid_geoshape_index_002/_search
  4. {
  5. "query": {
  6. "geo_bounding_box": {
  7. "centroid": {
  8. "top_left": {
  9. "lat": 42,
  10. "lon": -72
  11. },
  12. "bottom_right": {
  13. "lat": 40,
  14. "lon": -74
  15. }
  16. }
  17. }
  18. }
  19. }
  20. POST province2023_centroid_geoshape_index_002/_search
  21. {
  22. "query": {
  23. "geo_distance": {
  24. "distance": 100,
  25. "centroid": {
  26. "lat": 40.09937484066758,
  27. "lon": 116.41960604340115
  28. }
  29. }
  30. }
  31. }
  32. POST province2023_centroid_geoshape_index_002/_search
  33. {
  34. "query": {
  35. "bool": {
  36. "must": {
  37. "match": {
  38. "province_name":"xx市"
  39. }
  40. },
  41. "filter": {
  42. "geo_distance": {
  43. "distance": "2km",
  44. "centroid": {
  45. "lat": 40.09937484066758,
  46. "lon": 116.41960604340115
  47. }
  48. }
  49. }
  50. }
  51. }
  52. }
  53. POST province2023_centroid_geoshape_index_002/_search
  54. {
  55. "query": {
  56. "bool": {
  57. "must": {
  58. "match": {
  59. "province_name":"xx市"
  60. }
  61. },
  62. "filter": {
  63. "geo_distance": {
  64. "distance": "200km",
  65. "location": {
  66. "lat": 40.09937484066758,
  67. "lon": 116.41960604340115
  68. }
  69. }
  70. }
  71. }
  72. }
  73. }

 

原文链接:https://www.cnblogs.com/hbuuid/p/18327500

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号