经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
restapi(8)- restapi-sql:用户自主的服务
来源:cnblogs  作者:雪川大虫  时间:2019/10/29 10:29:18  对本文有异议

  学习函数式编程初衷是看到自己熟悉的oop编程语言和sql数据库在现代商业社会中前景暗淡,准备完全放弃windows技术栈转到分布式大数据技术领域的。但是在现实中理想总是不如人意,本来想在一个规模较小的公司展展拳脚,以为小公司会少点历史包袱,有利于全面技术改造。但现实是:即使是小公司,一旦有个成熟的产品,那么进行全面的技术更新基本上是不可能的了,因为公司要生存,开发人员很难新旧技术之间随时切换。除非有狂热的热情,员工怠慢甚至抵制情绪不容易解决。只能采取逐步切换方式:保留原有产品的后期维护不动,新产品开发用一些新的技术。在我们这里的情况就是:以前一堆c#、sqlserver的东西必须保留,新的功能比如大数据、ai、识别等必须用新的手段如scala、python、dart、akka、kafka、cassandra、mongodb来开发。好了,新旧两个开发平台之间的软件系统对接又变成了一个问题。

   现在我们这里有个需求:把在linux-ubuntu akka-cluster集群环境里mongodb里数据处理的结果传给windows server下SQLServer里。这是一种典型的异系统集成场景。我的解决方案是通过一个restapi服务作为两个系统的数据桥梁,这个restapi的最基本要求是:

1、支持任何操作系统前端:这个没什么问题,在http层上通过json交换数据

2、能读写mongodb:在前面讨论的restapi-mongo已经实现了这一功能

3、能读写windows server环境下的sqlserver:这个是本篇讨论的主题

4、用户能够比较方便的对平台数据库进行操作,最好免去前后双方每类操作都需要进行协定model这一过程,也就是能达到用户随意调用服务

前面曾经实现了一个jdbc-engine项目,基于scalikejdbc,不过只示范了slick-h2相关的功能。现在需要sqlserver-jdbc驱动,然后试试能不能在JVM里驱动windows下的sqlserver。maven里找不到sqlserver的驱动,但从微软官网可以下载mssql-jdbc-7.0.0.jre8.jar。这是个jar,在sbt里称作unmanagedjar,不能摆在build.sbt的dependency里。这个需要摆在项目根目录下的lib目录下即可(也可以在放在build.sbt里unmanagedBase :=?? 指定的路径下)。然后是数据库连接,下面是可以使用sqlserver的application.conf配置文件内容:

  1. # JDBC settings
  2. prod {
  3. db {
  4. h2 {
  5. driver = "org.h2.Driver"
  6. url = "jdbc:h2:tcp://localhost/~/slickdemo"
  7. user = ""
  8. password = ""
  9. poolFactoryName = "hikaricp"
  10. numThreads = 10
  11. maxConnections = 12
  12. minConnections = 4
  13. keepAliveConnection = true
  14. }
  15. mysql {
  16. driver = "com.mysql.cj.jdbc.Driver"
  17. url = "jdbc:mysql://localhost:3306/testdb"
  18. user = "root"
  19. password = "123"
  20. poolFactoryName = "hikaricp"
  21. numThreads = 10
  22. maxConnections = 12
  23. minConnections = 4
  24. keepAliveConnection = true
  25. }
  26. postgres {
  27. driver = "org.postgresql.Driver"
  28. url = "jdbc:postgresql://localhost:5432/testdb"
  29. user = "root"
  30. password = "123"
  31. poolFactoryName = "hikaricp"
  32. numThreads = 10
  33. maxConnections = 12
  34. minConnections = 4
  35. keepAliveConnection = true
  36. }
  37. mssql {
  38. driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  39. url = "jdbc:sqlserver://192.168.11.164:1433;integratedSecurity=false;Connect Timeout=3000"
  40. user = "sa"
  41. password = "Tiger2020"
  42. poolFactoryName = "hikaricp"
  43. numThreads = 10
  44. maxConnections = 12
  45. minConnections = 4
  46. keepAliveConnection = true
  47. connectionTimeout = 3000
  48. }
  49. termtxns {
  50. driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  51. url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=TERMTXNS;integratedSecurity=false;Connect Timeout=3000"
  52. user = "sa"
  53. password = "Tiger2020"
  54. poolFactoryName = "hikaricp"
  55. numThreads = 10
  56. maxConnections = 12
  57. minConnections = 4
  58. keepAliveConnection = true
  59. connectionTimeout = 3000
  60. }
  61. crmdb {
  62. driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  63. url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=CRMDB;integratedSecurity=false;Connect Timeout=3000"
  64. user = "sa"
  65. password = "Tiger2020"
  66. poolFactoryName = "hikaricp"
  67. numThreads = 10
  68. maxConnections = 12
  69. minConnections = 4
  70. keepAliveConnection = true
  71. connectionTimeout = 3000
  72. }
  73. }
  74. # scallikejdbc Global settings
  75. scalikejdbc.global.loggingSQLAndTime.enabled = true
  76. scalikejdbc.global.loggingSQLAndTime.logLevel = info
  77. scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  78. scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  79. scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  80. scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  81. scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  82. scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
  83. }

这个文件里的mssql,termtxns,crmdb段落都是给sqlserver的,它们都使用hikaricp线程池管理。

在jdbc-engine里启动数据库方式如下:

  1. ConfigDBsWithEnv("prod").setup('termtxns)
  2. ConfigDBsWithEnv("prod").setup('crmdb)
  3. ConfigDBsWithEnv("prod").loadGlobalSettings()

这段打开了在配置文件中用termtxns,crmdb注明的数据库。

下面是SqlHttpServer.scala的代码:

  1. package com.datatech.rest.sql
  2. import akka.http.scaladsl.Http
  3. import akka.http.scaladsl.server.Directives._
  4. import pdi.jwt._
  5. import AuthBase._
  6. import MockUserAuthService._
  7. import com.datatech.sdp.jdbc.config.ConfigDBsWithEnv
  8. import akka.actor.ActorSystem
  9. import akka.stream.ActorMaterializer
  10. import Repo._
  11. import SqlRoute._
  12. object SqlHttpServer extends App {
  13. implicit val httpSys = ActorSystem("sql-http-sys")
  14. implicit val httpMat = ActorMaterializer()
  15. implicit val httpEC = httpSys.dispatcher
  16. ConfigDBsWithEnv("prod").setup('termtxns)
  17. ConfigDBsWithEnv("prod").setup('crmdb)
  18. ConfigDBsWithEnv("prod").loadGlobalSettings()
  19. implicit val authenticator = new AuthBase()
  20. .withAlgorithm(JwtAlgorithm.HS256)
  21. .withSecretKey("OpenSesame")
  22. .withUserFunc(getValidUser)
  23. val route =
  24. path("auth") {
  25. authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
  26. post { complete(authenticator.issueJwt(userinfo))}
  27. }
  28. } ~
  29. pathPrefix("api") {
  30. authenticateOAuth2(realm = "api", authenticator.authenticateToken) { token =>
  31. new SqlRoute("sql", token)(new JDBCRepo)
  32. .route
  33. // ~ ...
  34. }
  35. }
  36. val (port, host) = (50081,"192.168.11.189")
  37. val bindingFuture = Http().bindAndHandle(route,host,port)
  38. println(s"Server running at $host $port. Press any key to exit ...")
  39. scala.io.StdIn.readLine()
  40. bindingFuture.flatMap(_.unbind())
  41. .onComplete(_ => httpSys.terminate())
  42. }

服务入口在http://mydemo.com/api/sql,服务包括get,post,put三类,参考这个SqlRoute:

  1. package com.datatech.rest.sql
  2. import akka.http.scaladsl.server.Directives
  3. import akka.stream.ActorMaterializer
  4. import akka.http.scaladsl.model._
  5. import akka.actor.ActorSystem
  6. import com.datatech.rest.sql.Repo.JDBCRepo
  7. import akka.http.scaladsl.common._
  8. import spray.json.DefaultJsonProtocol
  9. import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
  10. trait JsFormats extends SprayJsonSupport with DefaultJsonProtocol
  11. object JsConverters extends JsFormats {
  12. import SqlModels._
  13. implicit val brandFormat = jsonFormat2(Brand)
  14. implicit val customerFormat = jsonFormat6(Customer)
  15. }
  16. object SqlRoute {
  17. import JsConverters._
  18. implicit val jsonStreamingSupport = EntityStreamingSupport.json()
  19. .withParallelMarshalling(parallelism = 8, unordered = false)
  20. class SqlRoute(val pathName: String, val jwt: String)(repo: JDBCRepo)(
  21. implicit sys: ActorSystem, mat: ActorMaterializer) extends Directives with JsonConverter {
  22. val route = pathPrefix(pathName) {
  23. path(Segment / Remaining) { case (db, tbl) =>
  24. (get & parameter('sqltext)) { sql => {
  25. val rsc = new RSConverter
  26. val rows = repo.query[Map[String,Any]](db, sql, rsc.resultSet2Map)
  27. complete(rows.map(m => toJson(m)))
  28. }
  29. } ~ (post & parameter('sqltext)) { sql =>
  30. entity(as[String]){ json =>
  31. repo.batchInsert(db,tbl,sql,json)
  32. complete(StatusCodes.OK)
  33. }
  34. } ~ put {
  35. entity(as[Seq[String]]) { sqls =>
  36. repo.update(db, sqls)
  37. complete(StatusCodes.OK)
  38. }
  39. }
  40. }
  41. }
  42. }
  43. }

jdbc-engine的特点是可以用字符类型的sql语句来操作。所以我们可以通过传递字符串型的sql语句来实现服务调用,使用门槛低,方便通用。restapi-sql提供的是对服务器端sqlserver的普通操作,包括读get,写入post,更改put。这些sqlserver操作部分是在JDBCRepo里的:

  1. package com.datatech.rest.sql
  2. import com.datatech.sdp.jdbc.engine.JDBCEngine._
  3. import com.datatech.sdp.jdbc.engine.{JDBCQueryContext, JDBCUpdateContext}
  4. import scalikejdbc._
  5. import akka.stream.ActorMaterializer
  6. import com.datatech.sdp.result.DBOResult.DBOResult
  7. import akka.stream.scaladsl._
  8. import scala.concurrent._
  9. import SqlModels._
  10. object Repo {
  11. class JDBCRepo(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer) {
  12. def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {
  13. //construct the context
  14. val ctx = JDBCQueryContext(
  15. dbName = Symbol(db),
  16. statement = sqlText
  17. )
  18. jdbcAkkaStream(ctx,toRow)
  19. }
  20. def query(db: String, tbl: String, sqlText: String) = {
  21. //construct the context
  22. val ctx = JDBCQueryContext(
  23. dbName = Symbol(db),
  24. statement = sqlText
  25. )
  26. jdbcQueryResult[Vector,RS](ctx,getConverter(tbl)).toFuture[Vector[RS]]
  27. }
  28. def update(db: String, sqlTexts: Seq[String]): DBOResult[Seq[Long]] = {
  29. val ctx = JDBCUpdateContext(
  30. dbName = Symbol(db),
  31. statements = sqlTexts
  32. )
  33. jdbcTxUpdates(ctx)
  34. }
  35. def bulkInsert[P](db: String, sqlText: String, prepParams: P => Seq[Any], params: Source[P,_]) = {
  36. val insertAction = JDBCActionStream(
  37. dbName = Symbol(db),
  38. parallelism = 4,
  39. processInOrder = false,
  40. statement = sqlText,
  41. prepareParams = prepParams
  42. )
  43. params.via(insertAction.performOnRow).to(Sink.ignore).run()
  44. }
  45. def batchInsert(db: String, tbl: String, sqlText: String, jsonParams: String):DBOResult[Seq[Long]] = {
  46. val ctx = JDBCUpdateContext(
  47. dbName = Symbol(db),
  48. statements = Seq(sqlText),
  49. batch = true,
  50. parameters = getSeqParams(jsonParams,sqlText)
  51. )
  52. jdbcBatchUpdate[Seq](ctx)
  53. }
  54. }
  55. import monix.execution.Scheduler.Implicits.global
  56. implicit class DBResultToFuture(dbr: DBOResult[_]){
  57. def toFuture[R] = {
  58. dbr.value.value.runToFuture.map {
  59. eor =>
  60. eor match {
  61. case Right(or) => or match {
  62. case Some(r) => r.asInstanceOf[R]
  63. case None => throw new RuntimeException("Operation produced None result!")
  64. }
  65. case Left(err) => throw new RuntimeException(err)
  66. }
  67. }
  68. }
  69. }
  70. }

读query部分即 def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {...} 这个函数返回Source[R,Any],下面我们好好谈谈这个R:R是读的结果,通常是某个类或model,比如读取Person记录返回一组Person类的实例。这里有一种强类型的感觉。一开始我也是随大流坚持建model后用toJson[E],fromJson[E]这样做线上数据转换。现在的问题是restapi-sql是一项公共服务,使用者知道sqlserver上有些什么表,然后希望通过sql语句来从这些表里读取数据。这些sql语句可能超出表的界限如sql join, union等,如果我们坚持每个返回结果都必须有个对应的model,那么显然就会牺牲这个服务的通用性。实际上,http线上数据交换本身就不可能是强类型的,因为经过了json转换。对于json转换来说,只要求字段名称、字段类型对称就行了。至于从什么类型转换成了另一个什么类型都没问题。所以,字段名+字段值的表现形式不就是Map[K,V]吗,我们就用Map[K,V]作为万能model就行了,没人知道。也就是说用户方通过sql语句指定返回的字段名称,它们可能是任何类型Any,具体类型自然会由数据库补上。服务方从数据库读取结果ResultSet后转成Map[K,V]然后再转成json返回给用户,用户可以用Map[String,Any]信息产生任何类型,这就是自主。好,就来看看如何将ResultSet转成Map[String,Any]:

  1. package com.datatech.rest.sql
  2. import scalikejdbc._
  3. import java.sql.ResultSetMetaData
  4. class RSConverter {
  5. import RSConverterUtil._
  6. var rsMeta: ResultSetMetaData = _
  7. var columnCount: Int = 0
  8. var rsFields: List[(String,String)] = List[(String,String)]()
  9. def getFieldsInfo:List[(String,String)] =
  10. ( 1 until columnCount).foldLeft(List[(String,String)]()) {
  11. case (cons,i) =>
  12. (rsMeta.getColumnLabel(i) -> rsMeta.getColumnTypeName(i)) :: cons
  13. }
  14. def resultSet2Map(rs: WrappedResultSet): Map[String,Any] = {
  15. if(columnCount == 0) {
  16. rsMeta = rs.underlying.getMetaData
  17. columnCount = rsMeta.getColumnCount
  18. rsFields = getFieldsInfo
  19. }
  20. rsFields.foldLeft(Map[String,Any]()) {
  21. case (m,(n,t)) =>
  22. m + (n -> rsFieldValue(n,t,rs))
  23. }
  24. }
  25. }
  26. object RSConverterUtil {
  27. import scala.collection.immutable.TreeMap
  28. def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {
  29. val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {
  30. case (t,(k,v)) => t + (stm.indexOfSlice(k) -> v)
  31. }
  32. sortedParams.map(_._2).toSeq
  33. }
  34. def rsFieldValue(fldname: String, fldType: String, rs: WrappedResultSet): Any = fldType match {
  35. case "LONGVARCHAR" => rs.string(fldname)
  36. case "VARCHAR" => rs.string(fldname)
  37. case "CHAR" => rs.string(fldname)
  38. case "BIT" => rs.boolean(fldname)
  39. case "TIME" => rs.time(fldname)
  40. case "TIMESTAMP" => rs.timestamp(fldname)
  41. case "ARRAY" => rs.array(fldname)
  42. case "NUMERIC" => rs.bigDecimal(fldname)
  43. case "BLOB" => rs.blob(fldname)
  44. case "TINYINT" => rs.byte(fldname)
  45. case "VARBINARY" => rs.bytes(fldname)
  46. case "BINARY" => rs.bytes(fldname)
  47. case "CLOB" => rs.clob(fldname)
  48. case "DATE" => rs.date(fldname)
  49. case "DOUBLE" => rs.double(fldname)
  50. case "REAL" => rs.float(fldname)
  51. case "FLOAT" => rs.float(fldname)
  52. case "INTEGER" => rs.int(fldname)
  53. case "SMALLINT" => rs.int(fldname)
  54. case "Option[Int]" => rs.intOpt(fldname)
  55. case "BIGINT" => rs.long(fldname)
  56. }
  57. }

这段主要功能是将JDBC的ResultSet转换成Map[String,Any]。在前面讨论的restapi-mongo我们可以进行Document到Map[String,Any]的转换以实现同样的目的。

下面是个调用query服务的例子:

  1. val getAllRequest = HttpRequest(
  2. HttpMethods.GET,
  3. uri = "http://192.168.11.189:50081/api/sql/termtxns/brand?sqltext=SELECT%20*%20FROM%20BRAND",
  4. ).addHeader(authentication)
  5. (for {
  6. response <- Http().singleRequest(getAllRequest)
  7. json <- Unmarshal(response.entity).to[String]
  8. } yield message).andThen {
  9. case Success(msg) => println(s"Received json collection: $json")
  10. case Failure(err) => println(s"Error: ${err.getMessage}")
  11. }

特点是我只需要提供sql语句,服务就会返回一个json数组,然后我怎么把json转成任何类型就随我高兴了。

再看看post服务:在这里希望实现一种批次型插入表的功能,比如从一个数据表里把数据搬到另外一个表。一般来讲在jdbc操作里首先得提供一个模版,如:insert into person(fullname,code) values(?,?),然后通过提供一组参数值来实现批次插入。当然,为安全起见,我们还是需要确定正确的参数位置,这个可以从sql语句里获取:

  1. def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {
  2. val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {
  3. case (t,(k,v)) => t + (stm.toUpperCase.indexOfSlice(k.toUpperCase) -> v)
  4. }
  5. sortedParams.map(_._2).toSeq
  6. }
  7. def getSeqParams(json: String, sql: String): Seq[Seq[Any]] = {
  8. val seqOfjson = fromJson[Seq[String]](json)
  9. val prs = seqOfjson.map(fromJson[Map[String,Any]])
  10. prs.map(RSConverterUtil.map2Params(sql,_))
  11. }

下面是个批次插入的示范代码:

  1. val encodedSelect = URLEncode.encode("select id as code, name as fullname from members")
  2. val encodedInsert = URLEncode.encode("insert into person(fullname,code) values(?,?)")
  3. val getMembers = HttpRequest(
  4. HttpMethods.GET,
  5. uri = "http://192.168.0.189:50081/api/sql/h2/members?sqltext="+encodedSelect
  6. ).addHeader(authentication)
  7. val postRequest = HttpRequest(
  8. HttpMethods.POST,
  9. uri = "http://192.168.0.189:50081/api/sql/h2/person?sqltext="+encodedInsert,
  10. ).addHeader(authentication)
  11. (for {
  12. _ <- update("http://192.168.0.189:50081/api/sql/h2/person",Seq(createCTX))
  13. respMembers <- Http().singleRequest(getMembers)
  14. message <- Unmarshal(respMembers.entity).to[String]
  15. reqEntity <- Marshal(message).to[RequestEntity]
  16. respInsert <- Http().singleRequest(postRequest.copy(entity = reqEntity))
  17. // HttpEntity(ContentTypes.`application/json`,ByteString(message))))
  18. } yield respInsert).onComplete {
  19. case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
  20. println("builk insert successful!")
  21. case Success(_) => println("builk insert failed!")
  22. case Failure(err) => println(s"Error: ${err.getMessage}")
  23. }

你看,我特别把参数值清单里字段位置和insert sql里字段先后位置颠倒了,但还是得到正确的结果。

最后是put:这是为批次型的事物处理设计的。接受一条或者多条无参数sql指令,多条指令会在一个事物中执行。具体使用方式如下:

  1. def update(url: String, cmds: Seq[String])(implicit token: Authorization): Future[HttpResponse] =
  2. for {
  3. reqEntity <- Marshal(cmds).to[RequestEntity]
  4. response <- Http().singleRequest(HttpRequest(
  5. method=HttpMethods.PUT,uri=url,entity=reqEntity)
  6. .addHeader(token))
  7. } yield response

在上面的讨论里介绍了基于sqlserver的rest服务,与前面讨论的restapi-mongo从原理上区别并不大,重点是实现了用户主导的数据库操作。

 

原文链接:http://www.cnblogs.com/tiger-xc/p/11754487.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号