经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(2) - typed-actor交流方式和交流协议
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:18  对本文有异议

   akka系统是一个分布式的消息驱动系统。akka应用由一群负责不同运算工作的actor组成,每个actor都是被动等待外界的某种消息来驱动自己的作业。所以,通俗点描述:akka应用就是一群actor相互之间发送消息的系统,每个actor接收到消息后开始自己负责的工作。对于akka-typed来说,typed-actor只能接收指定类型的消息,所以actor之间的消息交流需要按照消息类型来进行,即需要协议来规范消息交流机制。想想看,如果用户需要一个actor做某件事,他必须用这个actor明白的消息类型来发送消息,这就是一种交流协议。

所谓消息交流方式包括单向和双向两类。如果涉及两个actor之间的消息交换,消息发送方式可以是单向和双向的。但如果是从外界向一个actor发送消息,那么肯定只能是单向的发送方式了,因为消息发送两端只有一端是actor。

典型的单向消息发送fire-and-forget如下:

  1. import akka.actor.typed._
  2. import scaladsl._
  3. object Printer {
  4. case class PrintMe(message: String)
  5. // 只接收PrintMe类型message
  6. def apply(): Behavior[PrintMe] =
  7. Behaviors.receive {
  8. case (context, PrintMe(message)) =>
  9. context.log.info(message)
  10. Behaviors.same
  11. }
  12. }
  13. object FireAndGo extends App {
  14. // system就是一个root-actor
  15. val system: ActorRef[Printer.PrintMe] = ActorSystem(Printer(), "fire-and-forget-sample")
  16. val printer: ActorRef[Printer.PrintMe] = system
  17. // 单向消息发送,printMe类型的消息
  18. printer ! Printer.PrintMe("hello")
  19. printer ! Printer.PrintMe("world!")
  20. system.asInstanceOf[ActorSystem[Printer.PrintMe]].terminate()
  21. }

当然,在现实中通常我们要求actor去进行某些运算然后返回运算结果。这就涉及到actor之间双向信息交换了。第一种情况:两个actor之间的消息是任意无序的,这是一种典型的无顺序request-response模式。就是说一个response不一定是按照request的接收顺序返回的,只是它们之间能够交流而已。不过,在akka-typed中这种模式最基本的要求就是发送的消息类型必须符合接收方actor的类型。

好了,我们先对这个模式做个示范。所有actor的定义可以先从它的消息类型开始。对每个参加双向交流的actor来说,可以从request和response两种消息来反映它的功能:

  1. object FrontEnd {
  2. sealed trait FrontMessages
  3. case class SayHi(who: String) extends FrontMessages
  4. }
  5. object BackEnd {
  6. //先从这个actor的回应消息开始
  7. sealed trait Response
  8. case class HowAreU(msg: String) extends Response
  9. case object Unknown extends Response
  10. //可接收消息类型
  11. sealed trait BackMessages
  12. //这个replyTo应该是一个能处理Reponse类型消息的actor
  13. case class MakeHello(who: String, replyTo: ActorRef[Response]) extends BackMessages
  14. }

这个FrontEnd接收SayHi消息后开始工作,不过目前还没有定义返回的消息类型。BackEnd接到MakeHello类型消息后返回response类型消息。从这个角度来讲,返回的对方actor必须能够处理Response类型的消息。

我们试试实现这个FrontEnd actor:

  1. object FrontEnd {
  2. sealed trait FrontMessages
  3. case class SayHi(who: String) extends FrontMessages
  4. def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = {
  5. Behaviors.receive { (ctx,msg) => msg match {
  6. case SayHi(who) =>
  7. ctx.log.info("requested to say hi to {}", who)
  8. backEnd ! BackEnd.MakeHello(who, ???)
  9. }
  10. }
  11. }

MakeHello需要一个replyTo,应该是什么呢?不过它一定是可以处理Response类型消息的actor。但我们知道这个replyTo就是FrontEnd,不过FrontEnd只能处理FrontMessages类型消息,应该怎么办呢?可不可以把replyTo直接写成FrontEnd呢?虽然可以这么做,但这个MakeHello消息就只能跟FrontEnd绑死了。如果其它的actor也需要用到这个MakeHello的话就需要另外定义一个了。所以,最好的解决方案就是用某种类型转换方式来实现。如下:

  1. import akka.actor.typed._
  2. import scaladsl._
  3. object FrontEnd {
  4. sealed trait FrontMessages
  5. case class SayHi(who: String) extends FrontMessages
  6. case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages
  7. def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = {
  8. Behaviors.setup[FrontMessages] { ctx =>
  9. //ctx.messageAdapter(ref => WrappedBackEndResonse(ref))
  10. val backEndRef: ActorRef[BackEnd.Response] = ctx.messageAdapter(WrappedBackEndResonse)
  11. Behaviors.receive { (ctx, msg) =>
  12. msg match {
  13. case SayHi(who) =>
  14. ctx.log.info("requested to say hi to {}", who)
  15. backEnd ! BackEnd.MakeHello(who, backEndRef)
  16. Behaviors.same
  17. //messageAdapter将BackEnd.Response转换成WrappedBackEndResponse
  18. case WrappedBackEndResonse(msg) => msg match {
  19. case BackEnd.HowAreU(msg) =>
  20. ctx.log.info(msg)
  21. Behaviors.same
  22. case BackEnd.Unknown =>
  23. ctx.log.info("Unable to say hello")
  24. Behaviors.same
  25. }
  26. }
  27. }
  28. }
  29. }
  30. }

首先,我们用ctx.mesageAdapter产生了ActorRef[BackEnd.Response],正是我们需要提供给MakeHello消息的replyTo。看看这个messageAdapter函数:

  1. def messageAdapter[U: ClassTag](f: U => T): ActorRef[U]

如果我们进行类型替换U -> BackEnd.Response, T -> FrontMessage 那么:

  1. val backEndRef: ActorRef[BackEnd.Response] =
  2. ctx.messageAdapter((response: BackEnd.Response) => WrappedBackEndResonse(response))

实际上这个messageAdapter函数在本地ActorContext范围内登记了一个从BackEnd.Response类型到FrontMessages的转换。把接收到的BackEnd.Response立即转换成WrappedBackEndResponse(response)。

还有一种两个actor之间的双向交流模式是 1:1 request-response,即一对一模式。一对一的意思是发送方发送消息后等待回应消息。这就意味着收信方需要在完成运算任务后立即向发信方发送回应,否则造成发信方的超时异常。无法避免的是,这种模式依然会涉及消息类型的转换,如下:

  1. object FrontEnd {
  2. sealed trait FrontMessages
  3. case class SayHi(who: String) extends FrontMessages
  4. case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages
  5. case class ErrorResponse(errmsg: String) extends FrontMessages
  6. def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = {
  7. Behaviors.setup[FrontMessages] { ctx =>
  8. //ask需要超时上限
  9. import scala.concurrent.duration._
  10. import scala.util._
  11. implicit val timeOut: Timeout = 3.seconds
  12. Behaviors.receive[FrontMessages] { (ctx, msg) =>
  13. msg match {
  14. case SayHi(who) =>
  15. ctx.log.info("requested to say hi to {}", who)
  16. ctx.ask(backEnd,(backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){
  17. case Success(backResponse) => WrappedBackEndResonse(backResponse)
  18. case Failure(err) =>ErrorResponse(err.getLocalizedMessage)
  19. }
  20. Behaviors.same
  21. case WrappedBackEndResonse(msg) => msg match {
  22. case BackEnd.HowAreU(msg) =>
  23. ctx.log.info(msg)
  24. Behaviors.same
  25. case BackEnd.Unknown =>
  26. ctx.log.info("Unable to say hello")
  27. Behaviors.same
  28. }
  29. case ErrorResponse(errmsg) =>
  30. ctx.log.info("ask error: {}",errmsg)
  31. Behaviors.same
  32. }
  33. }
  34. }
  35. }
  36. }

似乎类型转换是在ask里实现的,看看这个函数:

  1. def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)(
  2. mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

req -> BackEnd.BackMessages, res -> BackEnd.Response, T -> FrontMessages。现在ask可以写成下面这样:

 

  1. ctx.ask[BackEnd.BackMessages,BackEnd.Response](backEnd,
  2. (backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){
  3. case Success(backResponse:BackEnd.Response) => WrappedBackEndResonse(backResponse)
  4. case Failure(err) =>ErrorResponse(err.getLocalizedMessage)
  5. }

 

这样看起来更明白点,也就是说ask把接收的BackEnd.Response转换成了FrontEnd处理的消息类型WrappedBackEndRespnse,也就是FrontMessages

还有一种ask模式是在actor之外进行的,如下:

  1. object AskDemo extends App {
  2. import akka.actor.typed.scaladsl.AskPattern._
  3. import scala.concurrent._
  4. import scala.concurrent.duration._
  5. import akka.util._
  6. import scala.util._
  7. implicit val system: ActorSystem[BackEnd.BackMessages] = ActorSystem(BackEnd(), "front-app")
  8. // asking someone requires a timeout if the timeout hits without response
  9. // the ask is failed with a TimeoutException
  10. implicit val timeout: Timeout = 3.seconds
  11. val result: Future[BackEnd.Response] =
  12. system.asInstanceOf[ActorRef[BackEnd.BackMessages]]
  13. .ask[BackEnd.Response]((ref: ActorRef[BackEnd.Response]) =>
  14. BackEnd.MakeHello("John", ref))
  15. // the response callback will be executed on this execution context
  16. implicit val ec = system.executionContext
  17. result.onComplete {
  18. case Success(res) => res match {
  19. case BackEnd.HowAreU(msg) =>
  20. println(msg)
  21. case BackEnd.Unknown =>
  22. println("Unable to say hello")
  23. }
  24. case Failure(ex) =>
  25. println(s"error: ${ex.getMessage}")
  26. }
  27. system.terminate()
  28. }

 这个ask是在akka.actor.typed.scaladsl.AskPattern包里。函数款式如下:

  1. def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

 

向ask传入一个函数ActorRef[BackEnd.Response] => BackEnd.BackMessages,然后返回Future[BackEnd.Response]。这个模式中接收回复方是在ActorContext之外,不存在消息截获机制,所以不涉及消息类型的转换。

另一种单actor双向消息交换模式,即自己ask自己。在ActorContext内向自己发送消息并提供回应消息的接收,如pipeToSelf:

 

  1. object PipeFutureTo {
  2. trait CustomerDataAccess {
  3. def update(value: Customer): Future[Done]
  4. }
  5. final case class Customer(id: String, version: Long, name: String, address: String)
  6. object CustomerRepository {
  7. sealed trait Command
  8. final case class Update(value: Customer, replyTo: ActorRef[UpdateResult]) extends Command
  9. sealed trait UpdateResult
  10. final case class UpdateSuccess(id: String) extends UpdateResult
  11. final case class UpdateFailure(id: String, reason: String) extends UpdateResult
  12. private final case class WrappedUpdateResult(result: UpdateResult, replyTo: ActorRef[UpdateResult])
  13. extends Command
  14. private val MaxOperationsInProgress = 10
  15. def apply(dataAccess: CustomerDataAccess): Behavior[Command] = {
  16. Behaviors.setup[Command] { ctx =>
  17. implicit val dispatcher = ctx.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))
  18. next(dataAccess, operationsInProgress = 0)
  19. }
  20. }
  21. private def next(dataAccess: CustomerDataAccess, operationsInProgress: Int)(implicit ec: ExecutionContextExecutor): Behavior[Command] = {
  22. Behaviors.receive { (context, command) =>
  23. command match {
  24. case Update(value, replyTo) =>
  25. if (operationsInProgress == MaxOperationsInProgress) {
  26. replyTo ! UpdateFailure(value.id, s"Max $MaxOperationsInProgress concurrent operations supported")
  27. Behaviors.same
  28. } else {
  29. val futureResult = dataAccess.update(value)
  30. context.pipeToSelf(futureResult) {
  31. // map the Future value to a message, handled by this actor
  32. case Success(_) => WrappedUpdateResult(UpdateSuccess(value.id), replyTo)
  33. case Failure(e) => WrappedUpdateResult(UpdateFailure(value.id, e.getMessage), replyTo)
  34. }
  35. // increase operationsInProgress counter
  36. next(dataAccess, operationsInProgress + 1)
  37. }
  38. case WrappedUpdateResult(result, replyTo) =>
  39. // send result to original requestor
  40. replyTo ! result
  41. // decrease operationsInProgress counter
  42. next(dataAccess, operationsInProgress - 1)
  43. }
  44. }
  45. }
  46. }
  47. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/12985478.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号