经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-grpc - 应用案例
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:05  对本文有异议

  上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。不过有些项目需求不等人,需要使用这项技术,所以研究了一下akka-grpc,写了一篇介绍。本想到此为止,继续其它项目。想想这样做法有点不负责任,像是草草收场。毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。

我说过,了解akka-grpc的主要目的还是在protobuf的应用上。这是一种高效率的序列化协议。刚好,公司有这么个项目,是一个图像处理平台:把很多图片拍摄终端的图像传上平台进行商品识别、OCR等图像处理。由于终端数量多、图像处理又特别消耗内存、CPU等计算资源、又要求快速响应,所以第一考虑就是使用akka-cluster把图像处理任务分割到多个节点上并行处理。这里就需要仔细考虑图片在终端到平台、然后集群节点与点actor间的传输效率了。如何在akka系统里使用protobuf格式的数据正是本篇讨论和示范的目的。

akka-grpc应用一般从IDL文件里消息类型和服务函数的定义开始,如下面这个.proto文件示范:

  1. syntax = "proto3";
  2. import "google/protobuf/wrappers.proto";
  3. import "google/protobuf/any.proto";
  4. import "scalapb/scalapb.proto";
  5. option (scalapb.options) = {
  6. // don't append file name to package
  7. flat_package: true
  8. // generate one Scala file for all messages (services still get their own file)
  9. single_file: true
  10. // add imports to generated file
  11. // useful when extending traits or using custom types
  12. // import: "io.ontherocks.hellogrpc.RockingMessage"
  13. // code to put at the top of generated file
  14. // works only with `single_file: true`
  15. //preamble: "sealed trait SomeSealedTrait"
  16. };
  17. package com.datatech.pos.abs;
  18. message UCredential {
  19. string userid = 1;
  20. string password = 2;
  21. }
  22. message JWToken {
  23. string jwt = 1;
  24. }
  25. message Picture {
  26. int32 num = 1;
  27. bytes blob = 2;
  28. }
  29. message Capture {
  30. string ean = 1;
  31. bytes cover1 = 2;
  32. bytes cover2 = 3;
  33. }
  34. message Book {
  35. string ean = 1;
  36. string ver = 2;
  37. string isbn = 3;
  38. string title = 4;
  39. string publisher = 5;
  40. double price = 6;
  41. bytes cover1 = 7;
  42. bytes cover2 = 8;
  43. }
  44. message QueryResult {
  45. int32 sts = 1;
  46. string msg = 2;
  47. Book bookinfo = 3;
  48. }
  49. service Services {
  50. rpc GetAuthToken(UCredential) returns (JWToken) {};
  51. rpc SavePicture(Picture) returns (QueryResult) {};
  52. rpc GetPicture(Picture) returns (Picture) {};
  53. // rpc SaveCapture(Capture) returns (QueryResult) {};
  54. // rpc GetCapture(Capture) returns (Capture) {};
  55. // rpc GetBookInfo(Capture) returns (QueryResult) {};
  56. }

因为这次示范针对的是protobuf的使用,所以就拣了SavePicture,GetPicture这两项服务函数。JWToken只是用户身份凭证,集群分片shard-entityId是以用户凭证为基础的,所以平台需要通过JWT进行跨节点任务指派以实现分布式图像处理运算。

下面就要在编译器插件自动产生的基础服务接口代码基础上进行具体的服务功能实现。这部分主要是对接口函数的实现(oveerride):

  1. class gRPCServices(trace: Boolean, system: ActorSystem, sharding: ClusterSharding)(
  2. implicit waitResponseTimeout: Timeout, authenticator: AuthBase) extends ServicesPowerApi with LogSupport {
  3. implicit val ec = system.dispatcher
  4. log.stepOn = trace
  5. override def getAuthToken(request: UCredential, meta: Metadata): Future[JWToken] = {
  6. val entityRef = sharding.entityRefFor(Authenticator.EntityKey, UUID.randomUUID.toString)
  7. val jwtResp = for {
  8. ui <- entityRef.ask[Authenticator.Response](Authenticator.GetUserInfo(request.userid, _))
  9. .map {
  10. case Authenticator.UserInfo(info) => info
  11. case _ => Map[String, Any]()
  12. }
  13. jwt <- entityRef.ask[Authenticator.Response](Authenticator.GetToken(ui, _))
  14. } yield jwt
  15. jwtResp.map {
  16. case Authenticator.JWToken(jwt) =>
  17. if (jwt.nonEmpty) JWToken(jwt)
  18. else throw new Exception("身份验证失败!无法提供凭证。")
  19. case _ => throw new Exception("身份验证失败!无法提供凭证。")
  20. }
  21. }
  22. override def savePicture(in: Picture, metadata: Metadata): Future[QueryResult] = {
  23. val jwt = getJwt(metadata).getOrElse("")
  24. val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))
  25. val (shopId, posId, termId, impurl,devId) = ids
  26. val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")
  27. val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.SaveImage(in, _))
  28. .map {
  29. case ImgProcessor.ValidImgPro(img) => QueryResult(sts = 0, msg = "picture saved.")
  30. case ImgProcessor.FailedImgPro(msg) => QueryResult(sts = -1, msg = msg)
  31. }
  32. futResp
  33. }
  34. override def getPicture(in: Picture, metadata: Metadata): Future[Picture] = {
  35. val jwt = getJwt(metadata).getOrElse("")
  36. val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))
  37. val (shopId, posId, termId, impurl,devId) = ids
  38. val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")
  39. val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.GetImage(in.num, _))
  40. .map {
  41. case ImgProcessor.ValidImgPro(img) => img
  42. case ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY)
  43. }
  44. futResp
  45. }
  46. def getJwt(metadata: Metadata): Option[String] = {
  47. metadata.getText("bearer")
  48. }
  49. }

由于是通过PowerApi模式产生的接口代码,所以接口函数都带有MetaData参数,代表HttpRequest header集合。可以看到:服务函数实现都是通过entityRef,一个分片调度器分配到集群某个节点ImgProcessor.EntityKey类型的entity-actor上进行的。shopId:posId就是代表为某用户构建的entityId,这个是通过用户在Request中提供的MetaData参数中jwt解析得出的。

可以看到,具体服务提供是通过集群的分片实现的。下面是这个分片的代码示范:

  1. log.step(s"initializing sharding for ${ImgProcessor.EntityKey} ...")(MachineId("",""))
  2. val imgEntityType = Entity(ImgProcessor.EntityKey) { entityContext =>
  3. ImgProcessor(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)
  4. }.withStopMessage(ImgProcessor.StopWorker)
  5. sharding.init(imgEntityType)

上面imgEntityType就是shard-entity类型,其实就是按用户提供的jwt在任意集群节点上实时构建的一个opencv图像处理器。下面是这个entity-actor的示范代码:

  1. object ImgProcessor extends LogSupport {
  2. sealed trait Command extends CborSerializable
  3. case class SaveImage(img: Picture, replyTo: ActorRef[Response]) extends Command
  4. case class GetImage(imgnum: Int,replyTo: ActorRef[Response]) extends Command
  5. sealed trait Response extends CborSerializable
  6. case class ValidImgPro(img: Picture) extends Response
  7. case class FailedImgPro(msg: String) extends Response
  8. def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {
  9. val (shopId,posId) = entityId.split(':').toList match {
  10. case sid::pid::Nil => (sid,pid) }
  11. implicit val loc = Messages.MachineId(shopId,posId)
  12. log.stepOn = trace
  13. Behaviors.setup[Command] { ctx =>
  14. implicit val ec = ctx.executionContext
  15. ctx.setReceiveTimeout(keepAlive, Idle)
  16. Behaviors.withTimers[Command] { timer =>
  17. Behaviors.receiveMessage[Command] {
  18. case SaveImage(img, replyTo) =>
  19. log.step(s"ImgProcessor: SaveImage(${img.num})")
  20. implicit val client = mongoClient(mgoHosts)
  21. maybeMgoClient = Some(client)
  22. ctx.pipeToSelf(savePicture(img)) {
  23. case Success(_) => {
  24. replyTo ! ValidImgPro(img)
  25. Done(loc.shopid, loc.posid, s"saved image #${img.num}.")
  26. }
  27. case Failure(err) =>
  28. log.error(s"ImgProcessor: SaveImage Error: ${err.getMessage}")
  29. replyTo ! FailedImgPro(err.getMessage)
  30. Done(loc.shopid, loc.posid, s"SaveImage with error: ${err.getMessage}")
  31. }
  32. Behaviors.same
  33. case GetImage(imgnum, replyTo) =>
  34. ...
  35. }
  36. }

整个图片传输是通过actor的消息实现的。akka消息支持多种序列化格式,包括protobuf, 在配置文件.conf里定义:

  1. akka {
  2. loglevel = INFO
  3. actor {
  4. provider = cluster
  5. serializers {
  6. jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
  7. proto = "akka.remote.serialization.ProtobufSerializer"
  8. }
  9. serialization-bindings {
  10. "com.datatech.pos.abs.CborSerializable" = jackson-cbor
  11. "scalapb.GeneratedMessage" = proto
  12. }
  13. }
  14. }

grpc server 基本上是个标准模块,不同的只是service参数:

  1. class gRPCServer(host: String, port: Int) extends LogSupport {
  2. def runServer(system: ActorSystem[_], service: gRPCServices): Future[Http.ServerBinding] = {
  3. implicit val classic = system.toClassic
  4. implicit val ec: ExecutionContext = system.executionContext
  5. // Create service handlers
  6. val serviceHandler: HttpRequest => Future[HttpResponse] =
  7. ServicesPowerApiHandler(service)
  8. // Bind service handler servers to localhost:8080/8081
  9. val binding = Http().bindAndHandleAsync(
  10. serviceHandler,
  11. interface = host,
  12. port = port,
  13. connectionContext = HttpConnectionContext())
  14. // report successful binding
  15. binding.foreach { binding => println(s"******* startup gRPC-server on: port = $port *******") }
  16. binding
  17. //#server
  18. }
  19. }

下面是客户端测试代码:

  1. object gRPCTestClient {
  2. def main(args: Array[String]): Unit = {
  3. val config_onenode = ConfigFactory.load("onenode")
  4. implicit val sys = ActorSystem("grpc-client", config_onenode)
  5. implicit val ec = sys.dispatcher
  6. val clientSettings = GrpcClientSettings.fromConfig(Services.name)
  7. // val clientSettings = GrpcClientSettings.connectToServiceAt("192.168.11.189", 50052);
  8. implicit val client = ServicesClient(clientSettings)
  9. val futJwt = client.getAuthToken(UCredential("9013", "123456"))
  10. val jwt = Await.result(futJwt, 5.seconds).jwt
  11. println(s"got jwt: ${jwt}")
  12. scala.io.StdIn.readLine()
  13. val bytes = FileStreaming.FileToByteArray("books/59c10d099b26e.jpg")
  14. val mat = bytesToMat(bytes)
  15. show(mat,"sent picture")
  16. scala.io.StdIn.readLine()
  17. val picture = Picture(111,marshal(bytes))
  18. val futQR = client.savePicture().addHeader("Bearer", jwt).invoke(Picture(111,marshal(bytes)))
  19. futQR.onComplete {
  20. case Success(qr) => println(s"Saving Success: ${qr.msg}")
  21. case Failure(err) => println(s"Saving Error: ${err.getMessage}")
  22. }
  23. scala.io.StdIn.readLine()
  24. val futPic = client.getPicture().addHeader("Bearer", jwt).invoke(Picture(111,ByteString.EMPTY))
  25. futPic.onComplete {
  26. case Success(pic) =>
  27. val image = bytesToMat(unmarshal(pic.blob))
  28. show(image, s"picture:${pic.num}")
  29. case Failure(err) => println(s"Reading Error: ${err.getMessage}")
  30. }
  31. scala.io.StdIn.readLine()
  32. sys.terminate()
  33. }
  34. }

基本流程是:先通过getAuthToken获取jwt;在调用服务时通过addHeader("bearer",jwt)把jwt随着函数调用一起提交给服务端。

客户端设置可以在配置文件中定义:

  1. akka {
  2. loglevel = INFO
  3. grpc.client {
  4. "com.datatech.pos.abs.Services" {
  5. host = 192.168.11.189
  6. port = 52051
  7. override-authority = foo.test.google.fr
  8. use-tls = false
  9. }
  10. }
  11. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13581075.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号