经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
kka-typed(5) - cluster:集群节点状态监视
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:14  对本文有异议

 akka-cluster对每个节点的每种状态变化都会在系统消息队列里发布相关的事件。通过订阅有关节点状态变化的消息就可以获取每个节点的状态。这部分已经在之前关于akka-cluster的讨论里介绍过了。由于akka-typed里采用了新的消息交流协议,而系统消息的发布和订阅也算是消息交换,也受交流协议约束。所以想通过重写以前示范的ClusterMemberStatus来了解一下akka-typed环境下节点状态变化消息监听的一些机制。

我们需要一个actor来订阅系统发布的节点状态变化消息。这里涉及到系统、actor两端的信息交流。假设向系统订阅是一种消息的发送,那么得到的节点状态变化消息就是系统的response了。先看看actor里的消息定义:

 

  1. object MonitorActor {
  2. sealed trait ClusterEvent
  3. private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  4. private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent
  5. def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
  6. val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
  7. val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
  8. Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
  9. Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])
  10. ...
  11. }

首先,response 分为 MemberEvent, ReachabilityEvent两种。MonitorActor处理的消息类型是ClusterEvent。为了处理系统返回的response类型,即MemberEvent,ReachabilityEvent,必须提供这两种类型到ClusterEvent的转换。通过ctx.messageAdapter登记MemberEvent -> MemberStatus, ReachabilityEvent -> ReachStatus两种类型转换机制使MonitorActor可以接收到MemberStatus, ReachStatus两种消息:

  1. object MonitorActor {
  2. sealed trait ClusterEvent
  3. private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  4. private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent
  5. def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
  6. val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
  7. val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
  8. Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
  9. Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])
  10. Behaviors.receiveMessage { event =>
  11. event match {
  12. case MemberStatus(status) =>
  13. status match {
  14. case MemberJoined(member) =>
  15. ctx.log.info("**************** Member joined: [{}] ***************", member.address)
  16. case MemberJoined(member) =>
  17. ctx.log.info("**************** Member joined: [{}] ***************", member.address)
  18. case MemberUp(member) =>
  19. ctx.log.info("**************** Member is Up: [{}] ***************", member.address)
  20. case MemberRemoved(member, previousStatus) =>
  21. ctx.log.info("**************** Member is Removed: [{}] after {} ***************",
  22. member.address, previousStatus)
  23. case MemberLeft(member) =>
  24. ctx.log.info("**************** Member left: [{}] ***************", member.address)
  25. case MemberExited(member) =>
  26. ctx.log.info("**************** Member exited: [{}] ***************", member.address)
  27. case _: MemberEvent => // ignore
  28. }
  29. case ReachStatus(status) =>
  30. status match {
  31. case UnreachableMember(member) =>
  32. ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member)
  33. case ReachableMember(member) =>
  34. ctx.log.info("**************** Member back to reachable: [{}] ***************", member)
  35. }
  36. }
  37. Behaviors.same
  38. }
  39. }
  40. }

还需要一个actor, 什么都不干。存粹构建一个MonitorActor:

  1. object RootActor {
  2. def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx =>
  3. ctx.spawn(MonitorActor(),"listner")
  4. Behaviors.empty
  5. }
  6. }

好了,看看main是怎么实现的吧:

  1. object ClusterMemberStatus {
  2. import com.typesafe.config.ConfigFactory
  3. def main(args: Array[String]): Unit = {
  4. val ports =
  5. if (args.isEmpty)
  6. Seq(25251, 25252, 0)
  7. else
  8. args.toSeq.map(_.toInt)
  9. ports.foreach { port =>
  10. startup(port)
  11. }
  12. }
  13. def startup(port: Int): Unit = {
  14. val config = ConfigFactory.parseString(s"""
  15. akka.remote.artery.canonical.port=$port
  16. """).withFallback(ConfigFactory.load("cluster.conf"))
  17. ActorSystem[Nothing](RootActor(),"ClusterSystem",config)
  18. }
  19. }

下面是测试结果显示:

  1. 22:14:52.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] ***************
  2. 22:14:52.810 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:51081] - Received InitJoinAck message from [Actor[akka://ClusterSystem@127.0.0.1:25251/system/cluster/core/daemon#313431252]] to [akka://ClusterSystem@127.0.0.1:51081]
  3. 22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Node [akka://ClusterSystem@127.0.0.1:51081] is JOINING, roles [dc-default]
  4. 22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:51081] ***************
  5. 22:14:52.829 [ClusterSystem-akka.actor.internal-dispatcher-7] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25251] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:51081,567025403336682144)]
  6. 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:51081] - Welcome from [akka://ClusterSystem@127.0.0.1:25251]
  7. 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] ***************
  8. 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:51081] ***************
  9. 22:14:52.858 [ClusterSystem-akka.actor.internal-dispatcher-13] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:51081] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25251,6076326462170320177)]
  10. 22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Leader is moving node [akka://ClusterSystem@127.0.0.1:51081] to [Up]
  11. 22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] ***************
  12. 22:14:53.679 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] ***************
  13. 22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Received InitJoin message from [Actor[akka://ClusterSystem@127.0.0.1:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] to [akka://ClusterSystem@127.0.0.1:25251]
  14. 22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Sending InitJoinAck message from node [akka://ClusterSystem@127.0.0.1:25251] to [Actor[akka://ClusterSystem@127.0.0.1:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] (version [2.6.5])
  15. 22:14:57.732 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25252] - Received InitJoinAck message from [Actor[akka://ClusterSystem@127.0.0.1:25251/system/cluster/core/daemon#313431252]] to [akka://ClusterSystem@127.0.0.1:25252]
  16. 22:14:57.734 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Node [akka://ClusterSystem@127.0.0.1:25252] is JOINING, roles [dc-default]
  17. 22:14:57.735 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] ***************
  18. 22:14:57.735 [ClusterSystem-akka.actor.internal-dispatcher-26] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25251] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25252,-6913064885699273532)]
  19. 22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25252] - Welcome from [akka://ClusterSystem@127.0.0.1:25251]
  20. 22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] ***************
  21. 22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25252] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25251,6076326462170320177)]
  22. 22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] ***************
  23. 22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] ***************
  24. 22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25252] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:51081,567025403336682144)]
  25. 22:14:57.740 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] ***************
  26. 22:14:57.740 [ClusterSystem-akka.actor.internal-dispatcher-16] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:51081] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25252,-6913064885699273532)]
  27. 22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Leader is moving node [akka://ClusterSystem@127.0.0.1:25252] to [Up]
  28. 22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] ***************
  29. 22:14:58.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] ***************
  30. 22:14:59.146 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] ***************

下面是本次示范的全部源代码:

build.sbt

  1. name := "learn-akka-typed"
  2. version := "0.1"
  3. scalaVersion := "2.13.1"
  4. scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
  5. javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
  6. val AkkaVersion = "2.6.5"
  7. val AkkaPersistenceCassandraVersion = "1.0.0"
  8. libraryDependencies ++= Seq(
  9. "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  10. "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  11. "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  12. "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  13. "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  14. "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  15. "ch.qos.logback" % "logback-classic" % "1.2.3"
  16. )

cluster.conf

  1. akka {
  2. actor {
  3. provider = cluster
  4. serialization-bindings {
  5. "com.learn.akka.CborSerializable" = jackson-cbor
  6. }
  7. }
  8. remote {
  9. artery {
  10. canonical.hostname = "127.0.0.1"
  11. canonical.port = 0
  12. }
  13. }
  14. cluster {
  15. seed-nodes = [
  16. "akka://ClusterSystem@127.0.0.1:25251",
  17. "akka://ClusterSystem@127.0.0.1:25252"]
  18. }
  19. }

ClusterMemberStatus.scala

  1. package com.learn.akka
  2. import akka.actor.typed._
  3. import akka.actor.typed.scaladsl.Behaviors
  4. import akka.cluster.ClusterEvent._
  5. import akka.cluster.typed.Subscribe
  6. import akka.cluster.typed.Cluster
  7. import akka.actor.typed.ActorSystem
  8. object MonitorActor {
  9. sealed trait ClusterEvent
  10. private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  11. private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent
  12. def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
  13. val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
  14. val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
  15. Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
  16. Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])
  17. Behaviors.receiveMessage { event =>
  18. event match {
  19. case MemberStatus(status) =>
  20. status match {
  21. case MemberJoined(member) =>
  22. ctx.log.info("**************** Member joined: [{}] ***************", member.address)
  23. case MemberJoined(member) =>
  24. ctx.log.info("**************** Member joined: [{}] ***************", member.address)
  25. case MemberUp(member) =>
  26. ctx.log.info("**************** Member is Up: [{}] ***************", member.address)
  27. case MemberRemoved(member, previousStatus) =>
  28. ctx.log.info("**************** Member is Removed: [{}] after {} ***************",
  29. member.address, previousStatus)
  30. case MemberLeft(member) =>
  31. ctx.log.info("**************** Member left: [{}] ***************", member.address)
  32. case MemberExited(member) =>
  33. ctx.log.info("**************** Member exited: [{}] ***************", member.address)
  34. case _: MemberEvent => // ignore
  35. }
  36. case ReachStatus(status) =>
  37. status match {
  38. case UnreachableMember(member) =>
  39. ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member)
  40. case ReachableMember(member) =>
  41. ctx.log.info("**************** Member back to reachable: [{}] ***************", member)
  42. }
  43. }
  44. Behaviors.same
  45. }
  46. }
  47. }
  48. object RootActor {
  49. def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx =>
  50. ctx.spawn(MonitorActor(),"listner")
  51. Behaviors.empty
  52. }
  53. }
  54. object ClusterMemberStatus {
  55. import com.typesafe.config.ConfigFactory
  56. def main(args: Array[String]): Unit = {
  57. val ports =
  58. if (args.isEmpty)
  59. Seq(25251, 25252, 0)
  60. else
  61. args.toSeq.map(_.toInt)
  62. ports.foreach { port =>
  63. startup(port)
  64. }
  65. }
  66. def startup(port: Int): Unit = {
  67. val config = ConfigFactory.parseString(s"""
  68. akka.remote.artery.canonical.port=$port
  69. """).withFallback(ConfigFactory.load("cluster.conf"))
  70. ActorSystem[Nothing](RootActor(),"ClusterSystem",config)
  71. }
  72. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13062642.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号