经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
Spark 整合ElasticSearch
来源:cnblogs  作者:hapjin  时间:2018/9/25 19:17:11  对本文有异议

Spark 整合ElasticSearch

因为做用户资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例。(spark 读取ElasticSearch中数据)

环境:IDEA2016,JDK8,windows10,安装的 ElasticSearch6.3.2 和 spark-2.3.1-bin-hadoop2.7,使用mvn package 将程序打成jar包,采用spark-submit提交给spark执行。

先在ElasticSearch中创建一个索引用来演示。因为是文本数据,因此采用ik分词。可参考:https://github.com/medcl/elasticsearch-analysis-ik

  • 创建索引:PUT /index_ik_test

  • 设置mapping 及相应的分词器,这里指定 content 字段为 ElasticSearch 的text 类型,并使用ik_max_word 分词模式

    POST index_ik_test/fulltext/_mapping
    {
    "properties": {
    "content":{
    "type": "text",
    "analyzer": "ik_max_word",
    "search_analyzer": "ik_max_word"
    }
    }
    }

  • 存几篇文档到ElasticSearch中

    POST index_ik_test/fulltext/1
    {"content":"其中有两个人受伤了"}

  • ik 分词器有两种分词模式:ik_max_wordik_smart。可通过如下方式查看一下这两者的区别:

    GET index_ik_test/_analyze
    {
    "text": ["其中国家投资了500万"],
    "tokenizer": "ik_smart"
    }

    分词结果:其中、国家、投资、了、500万

    ?

    GET index_ik_test/_analyze
    {
    "text": ["其中国家投资了500万"],
    "tokenizer": "ik_max_word"
    }

    分词结果:其中、中国、国家、投资、了、500、万

  • 使用GET index_ik_test/_mapping可查看索引的配置信息

    {
    "index_ik_test": {
    "mappings": {
    "fulltext": {
    "properties": {
    "content": {
    "type": "text",
    "analyzer": "ik_max_word"
    }
    }
    }
    }
    }
    }

好,现在ElasticSearch中有数据了,现在看怎么基于Spark读取ElasticSearch中的数据。

IDEA2016中新建一个Maven工程,当然也可以用SpringBoot工程,但是这里的是单纯的Maven Project。

ElasticSearch官方提供了elasticsearch-hadoop来供Spark访问ElasticSearch。具体可参考:官方文档es for spark

官方提供了elasticsearch-hadoopmaven 依赖,这个依赖包括了:ElasticSearch for Hadoop MR、ElasticSearch for Hadoop Hive、ElasticSearch for Hadoop Spark。如果只用到了Spark,也可以只添加ElasticSearch for spark依赖。具体可参考:这个链接

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-spark-20_2.10</artifactId>
  4. <version>6.3.2</version>
  5. </dependency>

创建spark运行上下文时需要spark-sql_2.11依赖,可参考:spark 官方文档quick start

To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.

在本文的示例中,添加了下面3个maven依赖:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop</artifactId>
  4. <version>6.3.2</version>
  5. </dependency>
  6. <!-- Spark dependency -->
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-sql_2.11</artifactId>
  10. <version>2.3.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>com.google.guava</groupId>
  14. <artifactId>guava</artifactId>
  15. <version>22.0</version>
  16. </dependency>

下面来直接看示例代码:

向ElasticSearch中写入数据

  • spark配置连接ElasticSearch。可参考:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html,我们采用的是:Configure the connector to run in WAN mode

    1. SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
    2. .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
  • 将数据写入到ElasticSearch

    1. JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
    2. JavaEsSpark.saveToEs(javaRDD, elasticIndex);

从ElasticSearch查询数据

  1. JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();
  2. for (Map<String, Object> item : searchRdd.collect()) {
  3. item.forEach((key, value)->{
  4. System.out.println("search key:" + key + ", search value:" + value);
  5. });
  6. }

使用?q=中国作为查询条件。整个完整示例代码如下:

  1. import com.google.common.collect.ImmutableList;
  2. import com.google.common.collect.ImmutableMap;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.sql.SparkSession;
  7. import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
  8. import java.util.Map;
  9. import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
  10. /**
  11. * Created by Administrator on 2018/8/28.
  12. */
  13. public class EsSparkTest {
  14. public void writeEs() {
  15. String elasticIndex = "spark/docs";
  16. //https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-native
  17. SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
  18. .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
  19. SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
  20. JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
  21. Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
  22. Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
  23. JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
  24. JavaEsSpark.saveToEs(javaRDD, elasticIndex);
  25. }
  26. public void readEs() {
  27. SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
  28. .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
  29. SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
  30. JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
  31. JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();
  32. for (Map<String, Object> item : searchRdd.collect()) {
  33. item.forEach((key, value)->{
  34. System.out.println("search key:" + key + ", search value:" + value);
  35. });
  36. }
  37. sparkSession.stop();
  38. }
  39. }

DemoApplication.java 入口main类

  1. public class DemoApplication {
  2. public static void main(String[] args) {
  3. new EsSparkTest().readEs();
  4. }
  5. }

IDEA菜单栏:view ---> window tools --->maven projects 打开maven 侧边栏。直接双击package打包。

$rz -bey esdemo-1.0-SNAPSHOT.jar 将打成的jar包上传到部署spark服务器上,使用如下命令提交运行:

~/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class DemoApplication esdemo-1.0-SNAPSHOT.jar

--class 是类的全路径名。如果执行过程中抛出ClassNotFoundException异常,要看一下pom.xml中指定的依赖是否在Spark安装目录下的 jars/ 目录下(比如事先把Guava jar 和 elasticsearch-hadoop-6.3.2.jar 上传到 jars/目录下)。最终执行readEs()方法查询得到的文档如下:

因为 content 字段采用的是ik_max_word分词模式,因此文本其中国家投资了500万 分词结果中包含了 中国,从而使得这篇document被查询到了。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号