经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(6) - cluster:group router, cluster-load-balancing
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:13  对本文有异议

先谈谈akka-typed的router actor。route 分pool router, group router两类。我们先看看pool-router的使用示范:

  1. val pool = Routers.pool(poolSize = 4)(
  2. // make sure the workers are restarted if they fail
  3. Behaviors.supervise(WorkerRoutee()).onFailure[Exception](SupervisorStrategy.restart))
  4. val router = ctx.spawn(pool, "worker-pool")
  5. (0 to 10).foreach { n =>
  6. router ! WorkerRoutee.DoLog(s"msg $n")
  7. }

上面例子里的pool是个pool-router,意思是一个有4个routees的routee池。每个routee都是通过WorkerRoutee()构建的,意味着routee池中只有一个种类的actor。pool-router是通过工厂方法直接在本地(JVM)构建(spawn)所有的routee。也就是说所有routee都是router的子actor。

再看看group-router的使用例子:

  1. val serviceKey = ServiceKey[Worker.Command]("log-worker")
  2. // this would likely happen elsewhere - if we create it locally we
  3. // can just as well use a pool
  4. val workerRoutee = ctx.spawn(WorkerRoutee(), "worker-route")
  5. ctx.system.receptionist ! Receptionist.Register(serviceKey, workerRoutee)
  6. val group = Routers.group(serviceKey)
  7. val router = ctx.spawn(group, "worker-group")
  8. // the group router will stash messages until it sees the first listing of registered
  9. // services from the receptionist, so it is safe to send messages right away
  10. (0 to 10).foreach { n =>
  11. router ! WorkerRoutee.DoLog(s"msg $n")
  12. }

group-router与pool-router有较多分别:

1、routee是在router之外构建的,router是用一个key通过Receptionist获取同key的actor清单作为routee group的

2、Receptionist是集群全局的。任何节点上的actor都可以发送注册消息在Receptionist上登记

3、没有size限制,任何actor一旦在Receptionist上登记即变成routee,接受router管理

应该说如果想把运算任务分配在集群里的各节点上并行运算实现load-balance效果,group-router是最合适的选择。不过对不同的运算任务需要多少routee则需要用户自行决定,不像以前akka-classic里通过cluster-metrics根据节点负载情况自动增减routee实例那么方便。

Receptionist: 既然说到,那么就再深入一点介绍Receptionist的应用:上面提到,Receptionist是集群全局的。就是说任何节点上的actor都可以在Receptonist上注册形成一个生存在集群中不同节点的actor清单。如果Receptionist把这个清单提供给一个用户,那么这个用户就可以把运算任务配置到各节点上,实现某种意义上的分布式运算模式。Receptionist的使用方式是:通过向本节点的Receptionist发送消息去登记ActorRef,然后通过Receptionist发布的登记变化消息即可获取最新的ActorRef清单:

  1. val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
  2. ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
  3. ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)

Receptionist的登记和清单获取是以ServiceKey作为关联的。那么获取的清单内应该全部是一种类型的actor,只不过它们的地址可能是跨节点的,但它们只能进行同一种运算。从另一个角度说,一项任务是分布在不同节点的actor并行进行运算的。

在上篇讨论里提过:如果发布-订阅机制是在两个actor之间进行的,那么这两个actor也需要在规定的信息交流协议框架下作业:我们必须注意消息类型,提供必要的消息类型转换机制。下面是一个Receptionist登记示范:

  1. object Worker {
  2. val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
  3. sealed trait Command
  4. final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
  5. final case class TextTransformed(text: String) extends CborSerializable
  6. def apply(): Behavior[Command] =
  7. Behaviors.setup { ctx =>
  8. // each worker registers themselves with the receptionist
  9. ctx.log.info("Registering myself with receptionist")
  10. ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
  11. Behaviors.receiveMessage {
  12. case TransformText(text, replyTo) =>
  13. replyTo ! TextTransformed(text.toUpperCase)
  14. Behaviors.same
  15. }
  16. }
  17. }

Receptionist登记比较直接:登记者不需要Receptionist返回消息,所以随便用ctx.self作为消息的sender。注意TransformText的replyTo: ActorRef[TextTransformed],代表sender是个可以处理TextTransformed消息类型的actor。实际上,在sender方是通过ctx.ask提供了TextTransformed的类型转换。

Receptionist.Subscribe需要Receptionist返回一个actor清单,所以是个request/response模式。那么发送给Receptionist消息中的replyTo必须是发送者能处理的类型,如下:

  1. def apply(): Behavior[Event] = Behaviors.setup { ctx =>
  2. Behaviors.withTimers { timers =>
  3. // subscribe to available workers
  4. val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
  5. case Worker.WorkerServiceKey.Listing(workers) =>
  6. WorkersUpdated(workers)
  7. }
  8. ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
  9. ...
  10. }

ctx.messageAdapter进行了一个从Receptionist.Listing返回类型到WorkersUpdated类型的转换机制登记:从Receptionist回复的List类型会被转换成WorkersUpdated类型,如下:

  1. ...
  2. Behaviors.receiveMessage {
  3. case WorkersUpdated(newWorkers) =>
  4. ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
  5. ...

另外,上面提过的TextTransformed转换如下:

  1. ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
  2. case Success(transformedText) => TransformCompleted(transformedText.text, text)
  3. case Failure(ex) => JobFailed("Processing timed out", text)
  4. }

ctx.ask将TextTransformed转换成TransformCompleted。完整的Behavior定义如下:

  1. object Frontend {
  2. sealed trait Event
  3. private case object Tick extends Event
  4. private final case class WorkersUpdated(newWorkers: Set[ActorRef[Worker.TransformText]]) extends Event
  5. private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
  6. private final case class JobFailed(why: String, text: String) extends Event
  7. def apply(): Behavior[Event] = Behaviors.setup { ctx =>
  8. Behaviors.withTimers { timers =>
  9. // subscribe to available workers
  10. val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
  11. case Worker.WorkerServiceKey.Listing(workers) =>
  12. WorkersUpdated(workers)
  13. }
  14. ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
  15. timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
  16. running(ctx, IndexedSeq.empty, jobCounter = 0)
  17. }
  18. }
  19. private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
  20. Behaviors.receiveMessage {
  21. case WorkersUpdated(newWorkers) =>
  22. ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
  23. running(ctx, newWorkers.toIndexedSeq, jobCounter)
  24. case Tick =>
  25. if (workers.isEmpty) {
  26. ctx.log.warn("Got tick request but no workers available, not sending any work")
  27. Behaviors.same
  28. } else {
  29. // how much time can pass before we consider a request failed
  30. implicit val timeout: Timeout = 5.seconds
  31. val selectedWorker = workers(jobCounter % workers.size)
  32. ctx.log.info("Sending work for processing to {}", selectedWorker)
  33. val text = s"hello-$jobCounter"
  34. ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
  35. case Success(transformedText) => TransformCompleted(transformedText.text, text)
  36. case Failure(ex) => JobFailed("Processing timed out", text)
  37. }
  38. running(ctx, workers, jobCounter + 1)
  39. }
  40. case TransformCompleted(originalText, transformedText) =>
  41. ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
  42. Behaviors.same
  43. case JobFailed(why, text) =>
  44. ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
  45. Behaviors.same
  46. }

现在我们可以示范用group-router来实现某种跨节点的分布式运算。因为group-router是通过Receptionist来实现对routees管理的,而Receptionist是集群全局的,意味着如果我们在各节点上构建routee,然后向Receptionist登记,就会形成一个跨节点的routee ActorRef清单。如果把任务分配到这个清单上的routee上去运算,应该能实现集群节点负载均衡的效果。下面我们就示范这个loadbalancer。流程很简单:在一个接入点 (serviceActor)中构建workersRouter,然后3个workerRoutee并向Receptionist登记,把接到的任务分解成子任务逐个发送给workersRouter。每个workerRoutee完成任务后将结果发送给一个聚合器Aggregator,Aggregator在核对完成接收所有workerRoutee返回的结果后再把汇总结果返回serverActor。先看看这个serverActor:

  1. object Service {
  2. val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")
  3. sealed trait Command extends CborSerializable
  4. case class ProcessText(text: String) extends Command {
  5. require(text.nonEmpty)
  6. }
  7. case class WrappedResult(res: Aggregator.Response) extends Command
  8. def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  9. val aggregator = ctx.spawn(Aggregator(), "aggregator")
  10. val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
  11. Behaviors.receiveMessage {
  12. case ProcessText(text) =>
  13. ctx.log.info("******************** received ProcessText command: {} ****************",text)
  14. val words = text.split(' ').toIndexedSeq
  15. aggregator ! Aggregator.CountText(words.size, aggregatorRef)
  16. words.foreach { word =>
  17. workersRouter ! WorkerRoutee.Count(word, aggregator)
  18. }
  19. Behaviors.same
  20. case WrappedResult(msg) =>
  21. msg match {
  22. case Aggregator.Result(res) =>
  23. ctx.log.info("************** mean length of words = {} **********", res)
  24. }
  25. Behaviors.same
  26. }
  27. }
  28. def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
  29. val singletonSettings = ClusterSingletonSettings(ctx.system)
  30. .withRole("front")
  31. SingletonActor(
  32. Behaviors.supervise(
  33. serviceBehavior(workersRouter)
  34. ).onFailure(
  35. SupervisorStrategy
  36. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  37. .withMaxRestarts(3)
  38. .withResetBackoffAfter(10.seconds)
  39. )
  40. , "singletonActor"
  41. ).withSettings(singletonSettings)
  42. }
  43. def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  44. val cluster = Cluster(ctx.system)
  45. val workersRouter = ctx.spawn(
  46. Routers.group(routerServiceKey)
  47. .withRoundRobinRouting(),
  48. "workersRouter"
  49. )
  50. (0 until 3).foreach { n =>
  51. val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
  52. ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
  53. }
  54. val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
  55. Behaviors.receiveMessage {
  56. case job@ProcessText(text) =>
  57. singletonActor ! job
  58. Behaviors.same
  59. }
  60. }
  61. }

整体goup-router和routee的构建是在apply()里,并把接到的任务转发给singletonActor。singletonActor是以serviceBehavior为核心的一个actor。在servceBehavior里把收到的任务分解并分别发送给workersRouter。值得注意的是:serviceBehavior期望接收从Aggregator的回应,它们之间存在request/response模式信息交流,所以需要Aggregator.Response到WrappedResult的类型转换机制。还有:子任务是通过workersRoute发送给个workerRoutee的,我们需要各workerRoutee把运算结果返给给Aggregator,所以发送给workersRouter的消息包含了Aggregator的ActorRef,如:workersRouter ! WorkerRoutee.Count(cnt,aggregatorRef)。

Aggregator是个persistentActor, 如下:

 

  1. object Aggregator {
  2. sealed trait Command
  3. sealed trait Event extends CborSerializable
  4. sealed trait Response
  5. case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
  6. case class MarkLength(word: String, len: Int) extends Command
  7. case class TextCounted(cnt: Int) extends Event
  8. case class LengthMarked(word: String, len: Int) extends Event
  9. case class Result(meanWordLength: Double) extends Response
  10. case class State(expectedNum: Int = 0, lens: List[Int] = Nil)
  11. var replyTo: ActorRef[Response] = _
  12. def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
  13. cmd match {
  14. case CountText(cnt,ref) =>
  15. replyTo = ref
  16. Effect.persist(TextCounted(cnt))
  17. case MarkLength(word,len) =>
  18. Effect.persist(LengthMarked(word,len))
  19. }
  20. }
  21. def eventHandler: (State, Event) => State = (st,ev) => {
  22. ev match {
  23. case TextCounted(cnt) =>
  24. st.copy(expectedNum = cnt, lens = Nil)
  25. case LengthMarked(word,len) =>
  26. val state = st.copy(lens = len :: st.lens)
  27. if (state.lens.size >= state.expectedNum) {
  28. val meanWordLength = state.lens.sum.toDouble / state.lens.size
  29. replyTo ! Result(meanWordLength)
  30. State()
  31. } else state
  32. }
  33. }
  34. val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
  35. if (st.lens.isEmpty) {
  36. if (ev.isInstanceOf[LengthMarked])
  37. true
  38. else
  39. false
  40. } else
  41. false
  42. }
  43. def apply(): Behavior[Command] = Behaviors.supervise(
  44. Behaviors.setup[Command] { ctx =>
  45. EventSourcedBehavior(
  46. persistenceId = PersistenceId("33","2333"),
  47. emptyState = State(),
  48. commandHandler = commandHandler,
  49. eventHandler = eventHandler
  50. ).onPersistFailure(
  51. SupervisorStrategy
  52. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  53. .withMaxRestarts(3)
  54. .withResetBackoffAfter(10.seconds)
  55. ).receiveSignal {
  56. case (state, RecoveryCompleted) =>
  57. ctx.log.info("**************Recovery Completed with state: {}***************",state)
  58. case (state, SnapshotCompleted(meta)) =>
  59. ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
  60. case (state,RecoveryFailed(err)) =>
  61. ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
  62. case (state,SnapshotFailed(meta,err)) =>
  63. ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
  64. }.snapshotWhen(takeSnapShot)
  65. }
  66. ).onFailure(
  67. SupervisorStrategy
  68. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  69. .withMaxRestarts(3)
  70. .withResetBackoffAfter(10.seconds)
  71. )
  72. }

注意这个takeSnapShot函数:这个函数是在EventSourcedBehavior.snapshotWhen(takeSnapShot)调用的。传入参数是(State,Event,seqenceNr),我们需要对State,Event的当前值进行分析后返回true代表做一次snapshot。

看看一部分显示就知道任务已经分配到几个节点上的routee:

  1. 20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.WorkerRoutee$ - ************** processing [this] on akka://ClusterSystem@127.0.0.1:51182 ***********
  2. 20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [text] on akka://ClusterSystem@127.0.0.1:51182 ***********
  3. 20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-36] INFO com.learn.akka.WorkerRoutee$ - ************** processing [be] on akka://ClusterSystem@127.0.0.1:51182 ***********
  4. 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.learn.akka.WorkerRoutee$ - ************** processing [will] on akka://ClusterSystem@127.0.0.1:51173 ***********
  5. 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-26] INFO com.learn.akka.WorkerRoutee$ - ************** processing [is] on akka://ClusterSystem@127.0.0.1:25251 ***********
  6. 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-13] INFO com.learn.akka.WorkerRoutee$ - ************** processing [the] on akka://ClusterSystem@127.0.0.1:51173 ***********
  7. 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [that] on akka://ClusterSystem@127.0.0.1:25251 ***********
  8. 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [analyzed] on akka://ClusterSystem@127.0.0.1:25251 ***********

这个例子的源代码如下:

  1. package com.learn.akka
  2. import akka.actor.typed._
  3. import akka.persistence.typed._
  4. import akka.persistence.typed.scaladsl._
  5. import scala.concurrent.duration._
  6. import akka.actor.typed.receptionist._
  7. import akka.actor.typed.scaladsl.Behaviors
  8. import akka.actor.typed.scaladsl._
  9. import akka.cluster.typed.Cluster
  10. import akka.cluster.typed.ClusterSingleton
  11. import akka.cluster.typed.ClusterSingletonSettings
  12. import akka.cluster.typed.SingletonActor
  13. import com.typesafe.config.ConfigFactory
  14. object WorkerRoutee {
  15. sealed trait Command extends CborSerializable
  16. case class Count(word: String, replyTo: ActorRef[Aggregator.Command]) extends Command
  17. def apply(nodeAddress: String): Behavior[Command] = Behaviors.setup {ctx =>
  18. Behaviors.receiveMessage[Command] {
  19. case Count(word,replyTo) =>
  20. ctx.log.info("************** processing [{}] on {} ***********",word,nodeAddress)
  21. replyTo ! Aggregator.MarkLength(word,word.length)
  22. Behaviors.same
  23. }
  24. }
  25. }
  26. object Aggregator {
  27. sealed trait Command
  28. sealed trait Event extends CborSerializable
  29. sealed trait Response
  30. case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
  31. case class MarkLength(word: String, len: Int) extends Command
  32. case class TextCounted(cnt: Int) extends Event
  33. case class LengthMarked(word: String, len: Int) extends Event
  34. case class Result(meanWordLength: Double) extends Response
  35. case class State(expectedNum: Int = 0, lens: List[Int] = Nil)
  36. var replyTo: ActorRef[Response] = _
  37. def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
  38. cmd match {
  39. case CountText(cnt,ref) =>
  40. replyTo = ref
  41. Effect.persist(TextCounted(cnt))
  42. case MarkLength(word,len) =>
  43. Effect.persist(LengthMarked(word,len))
  44. }
  45. }
  46. def eventHandler: (State, Event) => State = (st,ev) => {
  47. ev match {
  48. case TextCounted(cnt) =>
  49. st.copy(expectedNum = cnt, lens = Nil)
  50. case LengthMarked(word,len) =>
  51. val state = st.copy(lens = len :: st.lens)
  52. if (state.lens.size >= state.expectedNum) {
  53. val meanWordLength = state.lens.sum.toDouble / state.lens.size
  54. replyTo ! Result(meanWordLength)
  55. State()
  56. } else state
  57. }
  58. }
  59. val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
  60. if (st.lens.isEmpty) {
  61. if (ev.isInstanceOf[LengthMarked])
  62. true
  63. else
  64. false
  65. } else
  66. false
  67. }
  68. def apply(): Behavior[Command] = Behaviors.supervise(
  69. Behaviors.setup[Command] { ctx =>
  70. EventSourcedBehavior(
  71. persistenceId = PersistenceId("33","2333"),
  72. emptyState = State(),
  73. commandHandler = commandHandler,
  74. eventHandler = eventHandler
  75. ).onPersistFailure(
  76. SupervisorStrategy
  77. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  78. .withMaxRestarts(3)
  79. .withResetBackoffAfter(10.seconds)
  80. ).receiveSignal {
  81. case (state, RecoveryCompleted) =>
  82. ctx.log.info("**************Recovery Completed with state: {}***************",state)
  83. case (state, SnapshotCompleted(meta)) =>
  84. ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
  85. case (state,RecoveryFailed(err)) =>
  86. ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
  87. case (state,SnapshotFailed(meta,err)) =>
  88. ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
  89. }.snapshotWhen(takeSnapShot)
  90. }
  91. ).onFailure(
  92. SupervisorStrategy
  93. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  94. .withMaxRestarts(3)
  95. .withResetBackoffAfter(10.seconds)
  96. )
  97. }
  98. object Service {
  99. val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")
  100. sealed trait Command extends CborSerializable
  101. case class ProcessText(text: String) extends Command {
  102. require(text.nonEmpty)
  103. }
  104. case class WrappedResult(res: Aggregator.Response) extends Command
  105. def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  106. val aggregator = ctx.spawn(Aggregator(), "aggregator")
  107. val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
  108. Behaviors.receiveMessage {
  109. case ProcessText(text) =>
  110. ctx.log.info("******************** received ProcessText command: {} ****************",text)
  111. val words = text.split(' ').toIndexedSeq
  112. aggregator ! Aggregator.CountText(words.size, aggregatorRef)
  113. words.foreach { word =>
  114. workersRouter ! WorkerRoutee.Count(word, aggregator)
  115. }
  116. Behaviors.same
  117. case WrappedResult(msg) =>
  118. msg match {
  119. case Aggregator.Result(res) =>
  120. ctx.log.info("************** mean length of words = {} **********", res)
  121. }
  122. Behaviors.same
  123. }
  124. }
  125. def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
  126. val singletonSettings = ClusterSingletonSettings(ctx.system)
  127. .withRole("front")
  128. SingletonActor(
  129. Behaviors.supervise(
  130. serviceBehavior(workersRouter)
  131. ).onFailure(
  132. SupervisorStrategy
  133. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  134. .withMaxRestarts(3)
  135. .withResetBackoffAfter(10.seconds)
  136. )
  137. , "singletonActor"
  138. ).withSettings(singletonSettings)
  139. }
  140. def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  141. val cluster = Cluster(ctx.system)
  142. val workersRouter = ctx.spawn(
  143. Routers.group(routerServiceKey)
  144. .withRoundRobinRouting(),
  145. "workersRouter"
  146. )
  147. (0 until 3).foreach { n =>
  148. val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
  149. ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
  150. }
  151. val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
  152. Behaviors.receiveMessage {
  153. case job@ProcessText(text) =>
  154. singletonActor ! job
  155. Behaviors.same
  156. }
  157. }
  158. }
  159. object LoadBalance {
  160. def main(args: Array[String]): Unit = {
  161. if (args.isEmpty) {
  162. startup("compute", 25251)
  163. startup("compute", 25252)
  164. startup("compute", 25253)
  165. startup("front", 25254)
  166. } else {
  167. require(args.size == 2, "Usage: role port")
  168. startup(args(0), args(1).toInt)
  169. }
  170. }
  171. def startup(role: String, port: Int): Unit = {
  172. // Override the configuration of the port when specified as program argument
  173. val config = ConfigFactory
  174. .parseString(s"""
  175. akka.remote.artery.canonical.port=$port
  176. akka.cluster.roles = [$role]
  177. """)
  178. .withFallback(ConfigFactory.load("cluster-persistence"))
  179. val frontEnd = ActorSystem[Service.Command](Service(), "ClusterSystem", config)
  180. if (role == "front") {
  181. println("*************** sending ProcessText command ************")
  182. frontEnd ! Service.ProcessText("this is the text that will be analyzed")
  183. }
  184. }
  185. }

cluster-persistence.conf

  1. akka.actor.allow-java-serialization = on
  2. akka {
  3. loglevel = INFO
  4. actor {
  5. provider = cluster
  6. serialization-bindings {
  7. "com.learn.akka.CborSerializable" = jackson-cbor
  8. }
  9. }
  10. remote {
  11. artery {
  12. canonical.hostname = "127.0.0.1"
  13. canonical.port = 0
  14. }
  15. }
  16. cluster {
  17. seed-nodes = [
  18. "akka://ClusterSystem@127.0.0.1:25251",
  19. "akka://ClusterSystem@127.0.0.1:25252"]
  20. }
  21. # use Cassandra to store both snapshots and the events of the persistent actors
  22. persistence {
  23. journal.plugin = "akka.persistence.cassandra.journal"
  24. snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  25. }
  26. }
  27. akka.persistence.cassandra {
  28. # don't use autocreate in production
  29. journal.keyspace = "poc"
  30. journal.keyspace-autocreate = on
  31. journal.tables-autocreate = on
  32. snapshot.keyspace = "poc_snapshot"
  33. snapshot.keyspace-autocreate = on
  34. snapshot.tables-autocreate = on
  35. }
  36. datastax-java-driver {
  37. basic.contact-points = ["192.168.11.189:9042"]
  38. basic.load-balancing-policy.local-datacenter = "datacenter1"
  39. }

build.sbt

  1. name := "learn-akka-typed"
  2. version := "0.1"
  3. scalaVersion := "2.13.1"
  4. scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
  5. javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
  6. val AkkaVersion = "2.6.5"
  7. val AkkaPersistenceCassandraVersion = "1.0.0"
  8. libraryDependencies ++= Seq(
  9. "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  10. "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  11. "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  12. "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  13. "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  14. "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  15. "ch.qos.logback" % "logback-classic" % "1.2.3"
  16. )

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13091012.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号