经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(1) - actor生命周期管理
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:19  对本文有异议

   akka-typed的actor从创建、启用、状态转换、停用、监视等生命周期管理方式和akka-classic还是有一定的不同之处。这篇我们就介绍一下akka-typed的actor生命周期管理。

每一种actor都是通过定义它的行为属性behavior形成模版,然后由对上一层的父辈actor用spawn方法产生actor实例的。产生的actor实例加入一个系统的由上至下树形结构,直接在spawn产生自己的父辈之下。akka-typed的守护guardian-actor,即根部root-actor是通过在定义ActorSystem时指定并产生的。如下:

  1. val config = ConfigFactory.load("application.conf")
  2. val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
  3. man ! GreetStarter.RepeatedGreeting("Tiger",1.seconds)

在某种意义上,这个ActorSystem实例man就代表root-actor。我们可以向man发送消息然后由GreetStarter的behavior用自己的ActorContext进行spawn,stop,watch及分派计算任务等,其实就是一个程序的集线器:

  1. object GreetStarter {
  2. import Messages._
  3. def apply(): Behavior[SayHi] = {
  4. Behaviors.setup { ctx =>
  5. val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
  6. val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
  7. val greeter = ctx.spawn(Greeter(helloActor), "greeter")
  8. ctx.watch(greeter)
  9. ctx.watchWith(helloActor,StopWorker("something happend"))
  10. Behaviors.receiveMessage { who =>
  11. if (who.name == "stop") {
  12. ctx.stop(helloActor)
  13. ctx.stop(greeter)
  14. Behaviors.stopped
  15. } else {
  16. greeter ! who
  17. Behaviors.same
  18. }
  19. }
  20. }
  21. }
  22. }

但是,总有时候我们需要在root-actor的ActorContext之外来进行一些制造、使用actor的操作。下面这个官方文档上的例子是很好的示范:

  1. import akka.actor.typed.Behavior
  2. import akka.actor.typed.SpawnProtocol
  3. import akka.actor.typed.scaladsl.Behaviors
  4. import akka.actor.typed.scaladsl.LoggerOps
  5. object HelloWorldMain {
  6. def apply(): Behavior[SpawnProtocol.Command] =
  7. Behaviors.setup { context =>
  8. // Start initial tasks
  9. // context.spawn(...)
  10. SpawnProtocol()
  11. }
  12. }
  13. object Main extends App {
  14. implicit val system: ActorSystem[SpawnProtocol.Command] =
  15. ActorSystem(HelloWorldMain(), "hello")
  16. // needed in implicit scope for ask (?)
  17. import akka.actor.typed.scaladsl.AskPattern._
  18. implicit val ec: ExecutionContext = system.executionContext
  19. implicit val timeout: Timeout = Timeout(3.seconds)
  20. val greeter: Future[ActorRef[HelloWorld.Greet]] =
  21. system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
  22. val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
  23. context.log.info2("Greeting for {} from {}", message.whom, message.from)
  24. Behaviors.stopped
  25. }
  26. val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
  27. system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))
  28. for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
  29. greeterRef ! HelloWorld.Greet("Akka", replyToRef)
  30. }
  31. ...
  32. }

可以看到所有操作都在actor框架之外进行的。这个SpawnProtocol本身就是一个actor,如下:

  1. object SpawnProtocol {
  2. ...
  3. final case class Spawn[T](behavior: Behavior[T], name: String, props: Props, replyTo: ActorRef[ActorRef[T]])
  4. extends Command
  5. ...
  6. def apply(): Behavior[Command] =
  7. Behaviors.receive { (ctx, msg) =>
  8. msg match {
  9. case Spawn(bhvr, name, props, replyTo) =>
  10. val ref =
  11. if (name == null || name.equals(""))
  12. ctx.spawnAnonymous(bhvr, props)
  13. else {
  14. @tailrec def spawnWithUniqueName(c: Int): ActorRef[Any] = {
  15. val nameSuggestion = if (c == 0) name else s"$name-$c"
  16. ctx.child(nameSuggestion) match {
  17. case Some(_) => spawnWithUniqueName(c + 1) // already taken, try next
  18. case None => ctx.spawn(bhvr, nameSuggestion, props)
  19. }
  20. }
  21. spawnWithUniqueName(0)
  22. }
  23. replyTo ! ref
  24. Behaviors.same
  25. }
  26. }
  27. }

外界通过发送Spawn消息来指定产生新的actor。

actor的状态切换就是从一种behavior转到另一种behavior。我们可以自定义behavior或者用现成的Behaviors.???。如果只是涉及内部变量变化,那么可以直接生成带着变量的当前behavior,如下:

  1. object HelloWorldBot {
  2. def apply(max: Int): Behavior[HelloWorld.Greeted] = {
  3. bot(0, max)
  4. }
  5. private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =
  6. Behaviors.receive { (context, message) =>
  7. val n = greetingCounter + 1
  8. context.log.info2("Greeting {} for {}", n, message.whom)
  9. if (n == max) {
  10. Behaviors.stopped
  11. } else {
  12. message.from ! HelloWorld.Greet(message.whom, context.self)
  13. bot(n, max)
  14. }
  15. }
  16. }

actor停用可以由直属父辈actor的ActorContext.stop或者自身的Behaviors.stopped来实现。Behaviors.stopped可以带入一个清理函数。在actor完全停止之前进行一些清理操作: 

  1. object MasterControlProgram {
  2. sealed trait Command
  3. final case class SpawnJob(name: String) extends Command
  4. case object GracefulShutdown extends Command
  5. // Predefined cleanup operation
  6. def cleanup(log: Logger): Unit = log.info("Cleaning up!")
  7. def apply(): Behavior[Command] = {
  8. Behaviors
  9. .receive[Command] { (context, message) =>
  10. message match {
  11. case SpawnJob(jobName) =>
  12. context.log.info("Spawning job {}!", jobName)
  13. context.spawn(Job(jobName), name = jobName)
  14. Behaviors.same
  15. case GracefulShutdown =>
  16. context.log.info("Initiating graceful shutdown...")
  17. // perform graceful stop, executing cleanup before final system termination
  18. // behavior executing cleanup is passed as a parameter to Actor.stopped
  19. Behaviors.stopped { () =>
  20. cleanup(context.system.log)
  21. }
  22. }
  23. }
  24. .receiveSignal {
  25. case (context, PostStop) =>
  26. context.log.info("Master Control Program stopped")
  27. Behaviors.same
  28. }
  29. }
  30. }

实际上一个actor转入停用stop状态可以在另一个作为监视actor的receiveSignal获取,如下:

  1. object GreetStarter {
  2. import Messages._
  3. def apply(): Behavior[SayHi] = {
  4. Behaviors.setup { ctx =>
  5. val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
  6. val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
  7. val greeter = ctx.spawn(Greeter(helloActor), "greeter")
  8. ctx.watch(greeter)
  9. ctx.watchWith(helloActor,StopWorker("something happend"))
  10. Behaviors.receiveMessage { who =>
  11. if (who.name == "stop") {
  12. ctx.stop(helloActor)
  13. ctx.stop(greeter)
  14. Behaviors.stopped
  15. } else {
  16. greeter ! who
  17. Behaviors.same
  18. }
  19. }.receiveSignal {
  20. case (context, Terminated(ref)) =>
  21. context.log.info("{} stopped!", ref.path.name)
  22. Behaviors.same
  23. }
  24. }
  25. }
  26. }

下面是.receiveSignal函数及其捕获的Signal消息:

  1. trait Receive[T] extends Behavior[T] {
  2. def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
  3. }
  4. trait Signal
  5. /**
  6. * Lifecycle signal that is fired upon restart of the Actor before replacing
  7. * the behavior with the fresh one (i.e. this signal is received within the
  8. * behavior that failed).
  9. */
  10. sealed abstract class PreRestart extends Signal
  11. case object PreRestart extends PreRestart {
  12. def instance: PreRestart = this
  13. }
  14. /**
  15. * Lifecycle signal that is fired after this actor and all its child actors
  16. * (transitively) have terminated. The [[Terminated]] signal is only sent to
  17. * registered watchers after this signal has been processed.
  18. */
  19. sealed abstract class PostStop extends Signal
  20. // comment copied onto object for better hints in IDEs
  21. /**
  22. * Lifecycle signal that is fired after this actor and all its child actors
  23. * (transitively) have terminated. The [[Terminated]] signal is only sent to
  24. * registered watchers after this signal has been processed.
  25. */
  26. case object PostStop extends PostStop {
  27. def instance: PostStop = this
  28. }
  29. object Terminated {
  30. def apply(ref: ActorRef[Nothing]): Terminated = new Terminated(ref)
  31. def unapply(t: Terminated): Option[ActorRef[Nothing]] = Some(t.ref)
  32. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/12976199.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号