经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
Spark SQL,如何将 DataFrame 转为 json 格式
来源:cnblogs  作者:zzzzMing  时间:2018/12/7 9:21:04  对本文有异议

今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。

用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
  3. //提供隐式转换功能,比如将 Rdd 转为 dataframe
  4. import spark.implicits._
  5. val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
  6. df.show()
  7. /*-------------show -----------
  8. +---+---+
  9. | _1| _2|
  10. +---+---+
  11. |abc| 2|
  12. |efg| 4|
  13. +---+---+
  14. */
  15. //这里使用 dataframe Api 转换成 jsonArray
  16. val jsonStr:String = a.toJSON.collectAsList.toString
  17. /*--------------- json String-------------
  18. [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
  19. */

可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。

  1. [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]

而是下面这种形式。

  1. [{"abc":2}, {"efg":4}]

这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。

  1. import org.apache.spark.sql.{DataFrame, SparkSession}
  2. import org.apache.spark.sql.SparkSession
  3. val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
  4. //提供隐式转换功能,比如将 Rdd 转为 dataframe
  5. import spark.implicits._
  6. val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
  7. df.show()
  8. /*-------------show -----------
  9. +---+---+
  10. | _1| _2|
  11. +---+---+
  12. |abc| 2|
  13. |efg| 4|
  14. +---+---+
  15. */
  16. //接下来不一样了
  17. val df2Array:Array[Tuple2[String,Int]] = df.collect().map{case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)}
  18. val jsonData:Array[JSONObject] = aM.map{ i =>
  19. new JSONObject(Map(i._1 -> i._2))
  20. }
  21. val jsonArray:JSONArray = new JSONArray(jsonData.toList)
  22. /*-----------jsonArray------------
  23. [{"abc" : 2}, {"efg" : 4}]
  24. */

大概说明一下上述的代码,首先我们要先将 df 变量进行 collect 操作,将它转换成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我们需要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。

然后也是用 map 操作生成 Array[JSONObject],最后再转换成 JSONArray 就可以。

将数据转换成 json 的格式通常不能太大,一般用在 spark 跑出数据结果后写入到其他数据库的时候会用到,比如 Mysql 。

以上~~


欢迎关注公众号哈尔的数据城堡,里面有数据,代码,以及深度的思考。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号