经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
restapi(9)- caching, akka-http 缓存
来源:cnblogs  作者:雪川大虫  时间:2019/11/12 8:47:15  对本文有异议

restapi作为前后端交互的枢纽:面对大批量的前端请求,需要确保回复的及时性。使用缓存是一项有效工具。我们可以把多数前端请求的回复response存入缓存,特别是一些需要大量计算才能获取的回复值,更可以大大提高后端的反应速度。值得庆幸的是akka-http已经提供了对缓存的支持,是基于java8 caffein的一套缓存操作工具包的。下面就介绍一下akka-http的caching。

akka-http caching 有个依赖:

 

  1. "com.typesafe.akka" %% "akka-http-caching" % akkaHttpVersion,

 

先从缓存存储结构开始,看看下面的一段缓存结构定义:

  1. import akka.http.scaladsl.util.FastFuture
  2. import akka.http.caching.scaladsl.Cache
  3. import akka.http.caching.scaladsl.CachingSettings
  4. import akka.http.caching.LfuCache
  5. val defaultCachingSettings = CachingSettings(sys)
  6. val lfuCacheSettings = //最少使用排除算法缓存
  7. defaultCachingSettings.lfuCacheSettings
  8. .withInitialCapacity(128) //起始单位
  9. .withMaxCapacity(1024) //最大单位
  10. .withTimeToLive(1.hour) //最长存留时间
  11. .withTimeToIdle(30.minutes) //最长未使用时间
  12. val cachingSettings =
  13. defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings)
  14. //key -> String
  15. val lfuCache: Cache[String, Option[Map[String, Any]]] = LfuCache(cachingSettings)

lfuCache是一种基于使用频率算法的缓存管理系统,这个我们就不必多说了。最好能拿个例子来示范解释:刚好手头有个获取用户信息的http请求样板:

  1. val route = pathPrefix(pathName) {
  2. pathPrefix("getuserinfo") {
  3. (get & parameter('userid)) { userid => {
  4. val userinfo = posRepo.getUserInfo(userid)
  5. userinfo match {
  6. case Some(ui) => complete(toJson(ui))
  7. case None => complete(toJson(Map[String, Any](("TERMINALID" -> ""))))
  8. }
  9. }
  10. }
  11. }
  12. def getUserInfo(userid: String): Option[UserInfo] = {
  13. val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
  14. " ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
  15. " WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
  16. " AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'"
  17. val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map)
  18. val futUI: Future[Option[Map[String, Any]]] = rows.runWith(Sink.lastOption)
  19. Await.result(futUI, 3 seconds)
  20. }

当收到前端 http://mycom.com/pos/getuserinfo?userid=1234 这样的请求时需要从数据库里读取用户信息数据及进行一些转换处理。这个请求调用得频率较高、数据库读取也比较耗时,是个实在的例子。我们来看看如何实现缓存管理:

在akka-http里可以用两种方式来实现缓存管理:1、直接用cache工具,2、用akka-http提供的Directive: cache, alwaysCache

我们先看看如何直接使用cache操作,先看看Cache的构建:

  1. abstract class Cache[K, V] extends akka.http.caching.javadsl.Cache[K, V] {
  2. cache =>
  3. /**
  4. * Returns either the cached Future for the given key or evaluates the given value generating
  5. * function producing a `Future[V]`.
  6. */
  7. def apply(key: K, genValue: () => Future[V]): Future[V]

Cache[K,V]是以K为键,一个()=> Future[V]为值的结构,也就是说我们需要把一个获取Future值的函数存在缓存里:

  1. pathPrefix("getuserinfo") {
  2. (get & parameter('userid)) { userid => {
  3. val userinfo = lfuCache.getOrLoad(userid, _ => posRepo.futureUserInfo(userid))
  4. onComplete(userinfo) {
  5. _ match {
  6. case Success(oui) => oui match {
  7. case Some(ui) => complete(toJson(ui))
  8. case None => complete(toJson(Map[String, Any](("TERMINALID" -> ""))))
  9. }
  10. case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> ""))))
  11. }
  12. }
  13. }
  14. }
  15. def futureUserInfo(userid: String): Future[Option[Map[String, Any]]] = {
  16. val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
  17. " ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
  18. " WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
  19. " AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'"
  20. val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map)
  21. rows.runWith(Sink.lastOption)
  22. }

首先我们需要把getUserInfo修改成futureUserInfo,然后传入cache.getOrLoad():

  1. /**
  2. * Returns either the cached Future for the given key, or applies the given value loading
  3. * function on the key, producing a `Future[V]`.
  4. */
  5. def getOrLoad(key: K, loadValue: K => Future[V]): Future[V]

跟着我们再试试用akka-http的Directive, cache和alwaysCache。这两个是同一个东西,只是cache多了个是否使用缓存这么个控制,是通过request-header Cache-Control来实现的,如:Cache-Control`(`no-cache`)。cache函数是这样定义的;

  1. def cache[K](cache: Cache[K, RouteResult], keyer: PartialFunction[RequestContext, K]): Directive0

这个函数返回Directive0, 可以直接对应 { ...   complete(...) },所以cache可以把一个route包嵌在里面如:

  1. cache(myCache, simpleKeyer) {
  2. complete {
  3. i += 1
  4. i.toString
  5. }
  6. }

simpleKeyer是个K对应函数:在我们这个例子里K -> Uri, Cache[Uri,RouteResult]。这里有个现成的构建器:routeCache[Uri]

  1. /**
  2. * Creates an [[LfuCache]] with default settings obtained from the system's configuration.
  3. */
  4. def routeCache[K](implicit s: ActorSystem): Cache[K, RouteResult] =
  5. LfuCache[K, RouteResult](s)

不过这个LfuCache使用了application.conf里面的cachingSettings. 我们想直接控制lfuCache构建,所以可以用:

  1. val lfuCache = LfuCache[Uri,RouteResult](cachingSettings)

alwaysCache的具体使用和上面的cache.getOrLoad相同:

  1. import akka.http.scaladsl.model.{HttpMethods, StatusCodes, Uri}
  2. import akka.http.scaladsl.util.FastFuture
  3. import akka.http.caching.scaladsl.Cache
  4. import akka.http.caching.scaladsl.CachingSettings
  5. import akka.http.caching.LfuCache
  6. import akka.http.scaladsl.server.RequestContext
  7. import akka.http.scaladsl.server.RouteResult
  8. import akka.http.scaladsl.server.directives.CachingDirectives._
  9. import scala.concurrent.duration._
  10. import scala.util._
  11. val defaultCachingSettings = CachingSettings(sys)
  12. val lfuCacheSettings = //最少使用排除算法缓存
  13. defaultCachingSettings.lfuCacheSettings
  14. .withInitialCapacity(128) //起始单位
  15. .withMaxCapacity(1024) //最大单位
  16. .withTimeToLive(1.hour) //最长存留时间
  17. .withTimeToIdle(30.minutes) //最长未使用时间
  18. val cachingSettings =
  19. defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings)
  20. //Uri->key, RouteResult -> value
  21. val lfuCache = LfuCache[Uri,RouteResult](cachingSettings)
  22. //Example keyer for non-authenticated GET requests
  23. val simpleKeyer: PartialFunction[RequestContext, Uri] = {
  24. val isGet: RequestContext => Boolean = _.request.method == HttpMethods.GET
  25. // val isAuthorized: RequestContext => Boolean =
  26. // _.request.headers.exists(_.is(Authorization.lowercaseName))
  27. val result: PartialFunction[RequestContext, Uri] = {
  28. case r: RequestContext if isGet(r) => r.request.uri
  29. }
  30. result
  31. }
  32. val route = pathPrefix(pathName) {
  33. pathPrefix("getuserinfo") {
  34. (get & parameter('userid)) { userid => {
  35. alwaysCache(lfuCache,simpleKeyer) {
  36. onComplete(posRepo.futureUserInfo(userid)) {
  37. _ match {
  38. case Success(oui) => oui match {
  39. case Some(ui) => complete(toJson(ui))
  40. case None => complete(toJson(Map[String, Any](("TERMINALID" -> ""))))
  41. }
  42. case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> ""))))
  43. }
  44. }
  45. }
  46. }
  47. }
  48. } ~

好了,我觉着可能直接调用cache.getOrLoad会更好些,因为akka-http还在不停的变,java8caffein应该不会再调整了吧。

 

原文链接:http://www.cnblogs.com/tiger-xc/p/11834484.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号