经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(7) - cluster:sharding, 集群分片
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:11  对本文有异议

  在使用akka-typed的过程中发现有很多地方都简化了不少,变得更方便了,包括:Supervision,只要用Behaviors.supervise()把Behavior包住,很容易就可以实现这个actor的SupervisorStrategy.restartWithBackoff策略了。然后集群化的group router使用起来也很方便,再就是集群分片cluster-sharding了。下面我们就通过一个例子来介绍cluster-sharding的具体使用方法。

首先,分片的意思是指在集群中多个节点上部署某种actor,即entity,的构建机制。entity的构建是动态的,ClusterSharding系统根据各节点的负载情况决定到底在哪个节点构建entity,然后返回ShardRegion:一个该类entity具体的构建工具及消息中介。也就是说我们可以把同样的一种运算通过entityId指定给任何一个entity,但具体这个entity生存在集群哪个节点上人工是无法确定的,完全靠ClusterSharding引导。先设计一个简单功能的actor,测试它作为一个entity的工作细节:

 

  1. object Counter {
  2. sealed trait Command extends CborSerializable
  3. case object Increment extends Command
  4. final case class GetValue(replyTo: ActorRef[Response]) extends Command
  5. case object StopCounter extends Command
  6. private case object Idle extends Command
  7. sealed trait Response extends CborSerializable
  8. case class SubTtl(entityId: String, ttl: Int) extends Response
  9. val TypeKey = EntityTypeKey[Command]("Counter")
  10. def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = {
  11. Behaviors.setup { ctx =>
  12. def updated(value: Int): Behavior[Command] = {
  13. Behaviors.receiveMessage[Command] {
  14. case Increment =>
  15. ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
  16. updated(value + 1)
  17. case GetValue(replyTo) =>
  18. ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
  19. replyTo ! SubTtl(entityContext.entityId,value)
  20. Behaviors.same
  21. case Idle =>
  22. entityContext.shard ! ClusterSharding.Passivate(ctx.self)
  23. Behaviors.same
  24. case StopCounter =>
  25. Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId))
  26. }
  27. }
  28. ctx.setReceiveTimeout(30.seconds, Idle)
  29. updated(0)
  30. }
  31. }
  32. }

 

cluster-sharding的机制是这样的:在每个(或指定的)节点上构建部署一个某种EntityType的ShardRegion。这样系统可以在任何部署了ShardRegion的节点上构建这种entity。然后ClusterSharding系统会根据entityId来引导消息至正确的接收对象。我们再看看ShardRegion的部署是如何实现的吧:

 

  1. object EntityManager {
  2. sealed trait Command
  3. case class AddOne(counterId: String) extends Command
  4. case class GetSum(counterId: String ) extends Command
  5. case class WrappedTotal(res: Counter.Response) extends Command
  6. def apply(): Behavior[Command] = Behaviors.setup { ctx =>
  7. val cluster = Cluster(ctx.system)
  8. val sharding = ClusterSharding(ctx.system)
  9. val entityType = Entity(Counter.TypeKey) { entityContext =>
  10. Counter(cluster.selfMember.address.toString,entityContext)
  11. }.withStopMessage(Counter.StopCounter)
  12. sharding.init(entityType)
  13. val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref))
  14. Behaviors.receiveMessage[Command] {
  15. case AddOne(cid) =>
  16. val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
  17. entityRef ! Counter.Increment
  18. Behaviors.same
  19. case GetSum(cid) =>
  20. val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
  21. entityRef ! Counter.GetValue(counterRef)
  22. Behaviors.same
  23. case WrappedTotal(ttl) => ttl match {
  24. case Counter.SubTtl(eid,subttl) =>
  25. ctx.log.info("***********************{} total: {} ",eid,subttl)
  26. }
  27. Behaviors.same
  28. }
  29. }
  30. }

太简单了, sharding.ini(entityType)一个函数完成了一个节点分片部署。系统通过sharding.init(entityType)来实现ShardRegion构建。这个entityType代表某种特殊actor模版,看看它的构建函数:

  1. object Entity {
  2. /**
  3. * Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
  4. * settings can be defined using the `with` methods of the returned [[Entity]].
  5. *
  6. * @param typeKey A key that uniquely identifies the type of entity in this cluster
  7. * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
  8. * @tparam M The type of message the entity accepts
  9. */
  10. def apply[M](typeKey: EntityTypeKey[M])(
  11. createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] =
  12. new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None, None)
  13. }

这个函数需要一个EntityTyeKey和一个构建Behavior的函数createBehavior,产生一个Entity类型。Entity类型定义如下:

  1. final class Entity[M, E] private[akka] (
  2. val createBehavior: EntityContext[M] => Behavior[M],
  3. val typeKey: EntityTypeKey[M],
  4. val stopMessage: Option[M],
  5. val entityProps: Props,
  6. val settings: Option[ClusterShardingSettings],
  7. val messageExtractor: Option[ShardingMessageExtractor[E, M]],
  8. val allocationStrategy: Option[ShardAllocationStrategy],
  9. val role: Option[String],
  10. val dataCenter: Option[DataCenter]) {
  11. /**
  12. * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
  13. */
  14. def withEntityProps(newEntityProps: Props): Entity[M, E] =
  15. copy(entityProps = newEntityProps)
  16. /**
  17. * Additional settings, typically loaded from configuration.
  18. */
  19. def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
  20. copy(settings = Option(newSettings))
  21. /**
  22. * Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
  23. * If this is not defined it will be stopped automatically.
  24. * It can be useful to define a custom stop message if the entity needs to perform
  25. * some asynchronous cleanup or interactions before stopping.
  26. */
  27. def withStopMessage(newStopMessage: M): Entity[M, E] =
  28. copy(stopMessage = Option(newStopMessage))
  29. /**
  30. *
  31. * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
  32. * them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope
  33. * is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of
  34. * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
  35. * is configured with `akka.cluster.sharding.number-of-shards`.
  36. */
  37. def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] =
  38. new Entity(
  39. createBehavior,
  40. typeKey,
  41. stopMessage,
  42. entityProps,
  43. settings,
  44. Option(newExtractor),
  45. allocationStrategy,
  46. role,
  47. dataCenter)
  48. /**
  49. * Allocation strategy which decides on which nodes to allocate new shards,
  50. * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
  51. */
  52. def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] =
  53. copy(allocationStrategy = Option(newAllocationStrategy))
  54. /**
  55. * Run the Entity actors on nodes with the given role.
  56. */
  57. def withRole(newRole: String): Entity[M, E] = copy(role = Some(newRole))
  58. /**
  59. * The data center of the cluster nodes where the cluster sharding is running.
  60. * If the dataCenter is not specified then the same data center as current node. If the given
  61. * dataCenter does not match the data center of the current node the `ShardRegion` will be started
  62. * in proxy mode.
  63. */
  64. def withDataCenter(newDataCenter: DataCenter): Entity[M, E] = copy(dataCenter = Some(newDataCenter))
  65. private def copy(
  66. createBehavior: EntityContext[M] => Behavior[M] = createBehavior,
  67. typeKey: EntityTypeKey[M] = typeKey,
  68. stopMessage: Option[M] = stopMessage,
  69. entityProps: Props = entityProps,
  70. settings: Option[ClusterShardingSettings] = settings,
  71. allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy,
  72. role: Option[String] = role,
  73. dataCenter: Option[DataCenter] = dataCenter): Entity[M, E] = {
  74. new Entity(
  75. createBehavior,
  76. typeKey,
  77. stopMessage,
  78. entityProps,
  79. settings,
  80. messageExtractor,
  81. allocationStrategy,
  82. role,
  83. dataCenter)
  84. }
  85. }

这里面有许多方法用来控制Entity的构建和作业。

然后我们把这个EntityManager当作RootBehavior部署到多个节点上去:

  1. object ClusterShardingApp {
  2. def main(args: Array[String]): Unit = {
  3. if (args.isEmpty) {
  4. startup("shard", 25251)
  5. startup("shard", 25252)
  6. startup("shard", 25253)
  7. startup("front", 25254)
  8. } else {
  9. require(args.size == 2, "Usage: role port")
  10. startup(args(0), args(1).toInt)
  11. }
  12. }
  13. def startup(role: String, port: Int): Unit = {
  14. // Override the configuration of the port when specified as program argument
  15. val config = ConfigFactory
  16. .parseString(s"""
  17. akka.remote.artery.canonical.port=$port
  18. akka.cluster.roles = [$role]
  19. """)
  20. .withFallback(ConfigFactory.load("cluster"))
  21. val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
  22. ...
  23. }

一共设定了3个role=shard节点和1个front节点。

在front节点上对entityId分别为9013,9014,9015,9016几个entity发送消息:

  1. def startup(role: String, port: Int): Unit = {
  2. // Override the configuration of the port when specified as program argument
  3. val config = ConfigFactory
  4. .parseString(s"""
  5. akka.remote.artery.canonical.port=$port
  6. akka.cluster.roles = [$role]
  7. """)
  8. .withFallback(ConfigFactory.load("cluster"))
  9. val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
  10. if (role == "front") {
  11. entityManager ! EntityManager.AddOne("9013")
  12. entityManager ! EntityManager.AddOne("9014")
  13. entityManager ! EntityManager.AddOne("9013")
  14. entityManager ! EntityManager.AddOne("9015")
  15. entityManager ! EntityManager.AddOne("9013")
  16. entityManager ! EntityManager.AddOne("9014")
  17. entityManager ! EntityManager.AddOne("9014")
  18. entityManager ! EntityManager.AddOne("9013")
  19. entityManager ! EntityManager.AddOne("9015")
  20. entityManager ! EntityManager.AddOne("9015")
  21. entityManager ! EntityManager.AddOne("9016")
  22. entityManager ! EntityManager.GetSum("9014")
  23. entityManager ! EntityManager.GetSum("9015")
  24. entityManager ! EntityManager.GetSum("9013")
  25. entityManager ! EntityManager.GetSum("9016")
  26. }

以下是部分运算结果显示:

  1. 15:12:10.073 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
  2. 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
  3. 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
  4. 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
  5. 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
  6. 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
  7. 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
  8. 15:12:10.109 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
  9. 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
  10. 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
  11. 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 get value at akka://ClusterSystem@127.0.0.1:25254,9015
  12. 15:12:10.112 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9015 total: 3
  13. 15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 get value at akka://ClusterSystem@127.0.0.1:25253,9014
  14. 15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 get value at akka://ClusterSystem@127.0.0.1:25251,9013
  15. 15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9014 total: 3
  16. 15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9013 total: 4
  17. 15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 counting at akka://ClusterSystem@127.0.0.1:25251,9016
  18. 15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 get value at akka://ClusterSystem@127.0.0.1:25251,9016
  19. 15:12:10.172 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9016 total: 1
  20.  
  21. 15:19:32.176 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9013 stopping ... passivated for idling.
  22. 15:19:52.529 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9014 stopping ... passivated for idling.
  23. 15:19:52.658 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9016 stopping ... passivated for idling.
  24. 15:19:52.662 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.Counter$ - ************9015 stopping ... passivated for idling.

下面是本次示范的完整源代码:

ClusterSharding.scala

  1. package com.learn.akka
  2. import scala.concurrent.duration._
  3. import akka.actor.typed._
  4. import akka.actor.typed.scaladsl._
  5. import akka.cluster.sharding.typed.scaladsl.EntityContext
  6. import akka.cluster.sharding.typed.scaladsl.Entity
  7. import akka.persistence.typed.PersistenceId
  8. //#sharding-extension
  9. import akka.cluster.sharding.typed.ShardingEnvelope
  10. import akka.cluster.sharding.typed.scaladsl.ClusterSharding
  11. import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
  12. import akka.cluster.sharding.typed.scaladsl.EntityRef
  13. import com.typesafe.config.ConfigFactory
  14. import akka.cluster.typed.Cluster
  15. //#counter
  16. object Counter {
  17. sealed trait Command extends CborSerializable
  18. case object Increment extends Command
  19. final case class GetValue(replyTo: ActorRef[Response]) extends Command
  20. case object StopCounter extends Command
  21. private case object Idle extends Command
  22. sealed trait Response extends CborSerializable
  23. case class SubTtl(entityId: String, ttl: Int) extends Response
  24. val TypeKey = EntityTypeKey[Command]("Counter")
  25. def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = {
  26. Behaviors.setup { ctx =>
  27. def updated(value: Int): Behavior[Command] = {
  28. Behaviors.receiveMessage[Command] {
  29. case Increment =>
  30. ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
  31. updated(value + 1)
  32. case GetValue(replyTo) =>
  33. ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
  34. replyTo ! SubTtl(entityContext.entityId,value)
  35. Behaviors.same
  36. case Idle =>
  37. entityContext.shard ! ClusterSharding.Passivate(ctx.self)
  38. Behaviors.same
  39. case StopCounter =>
  40. Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId))
  41. }
  42. }
  43. ctx.setReceiveTimeout(30.seconds, Idle)
  44. updated(0)
  45. }
  46. }
  47. }
  48. object EntityManager {
  49. sealed trait Command
  50. case class AddOne(counterId: String) extends Command
  51. case class GetSum(counterId: String ) extends Command
  52. case class WrappedTotal(res: Counter.Response) extends Command
  53. def apply(): Behavior[Command] = Behaviors.setup { ctx =>
  54. val cluster = Cluster(ctx.system)
  55. val sharding = ClusterSharding(ctx.system)
  56. val entityType = Entity(Counter.TypeKey) { entityContext =>
  57. Counter(cluster.selfMember.address.toString,entityContext)
  58. }.withStopMessage(Counter.StopCounter)
  59. sharding.init(entityType)
  60. val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref))
  61. Behaviors.receiveMessage[Command] {
  62. case AddOne(cid) =>
  63. val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
  64. entityRef ! Counter.Increment
  65. Behaviors.same
  66. case GetSum(cid) =>
  67. val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
  68. entityRef ! Counter.GetValue(counterRef)
  69. Behaviors.same
  70. case WrappedTotal(ttl) => ttl match {
  71. case Counter.SubTtl(eid,subttl) =>
  72. ctx.log.info("***********************{} total: {} ",eid,subttl)
  73. }
  74. Behaviors.same
  75. }
  76. }
  77. }
  78. object ClusterShardingApp {
  79. def main(args: Array[String]): Unit = {
  80. if (args.isEmpty) {
  81. startup("shard", 25251)
  82. startup("shard", 25252)
  83. startup("shard", 25253)
  84. startup("front", 25254)
  85. } else {
  86. require(args.size == 2, "Usage: role port")
  87. startup(args(0), args(1).toInt)
  88. }
  89. }
  90. def startup(role: String, port: Int): Unit = {
  91. // Override the configuration of the port when specified as program argument
  92. val config = ConfigFactory
  93. .parseString(s"""
  94. akka.remote.artery.canonical.port=$port
  95. akka.cluster.roles = [$role]
  96. """)
  97. .withFallback(ConfigFactory.load("cluster"))
  98. val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
  99. if (role == "front") {
  100. entityManager ! EntityManager.AddOne("9013")
  101. entityManager ! EntityManager.AddOne("9014")
  102. entityManager ! EntityManager.AddOne("9013")
  103. entityManager ! EntityManager.AddOne("9015")
  104. entityManager ! EntityManager.AddOne("9013")
  105. entityManager ! EntityManager.AddOne("9014")
  106. entityManager ! EntityManager.AddOne("9014")
  107. entityManager ! EntityManager.AddOne("9013")
  108. entityManager ! EntityManager.AddOne("9015")
  109. entityManager ! EntityManager.AddOne("9015")
  110. entityManager ! EntityManager.AddOne("9016")
  111. entityManager ! EntityManager.GetSum("9014")
  112. entityManager ! EntityManager.GetSum("9015")
  113. entityManager ! EntityManager.GetSum("9013")
  114. entityManager ! EntityManager.GetSum("9016")
  115. }
  116. }
  117. }

cluster.conf

  1. akka {
  2. actor {
  3. provider = cluster
  4. serialization-bindings {
  5. "com.learn.akka.CborSerializable" = jackson-cbor
  6. }
  7. }
  8. remote {
  9. artery {
  10. canonical.hostname = "127.0.0.1"
  11. canonical.port = 0
  12. }
  13. }
  14. cluster {
  15. seed-nodes = [
  16. "akka://ClusterSystem@127.0.0.1:25251",
  17. "akka://ClusterSystem@127.0.0.1:25252"]
  18. }
  19. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13100185.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号