经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(4) - EventSourcedBehavior in action
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:15  对本文有异议

  前面提到过,akka-typed中较重要的改变是加入了EventSourcedBehavior。也就是说增加了一种专门负责EventSource模式的actor, 最终和其它种类的actor一道可以完美实现CQRS。新的actor,我还是把它称为persistentActor,还是一种能维护和维持运行状态的actor。即,actor内部状态可以存放在数据库里,然后通过一组功能函数来提供对状态的处理转变,即持续化处理persistence。当然作为一种具备EventSourcedBehavior的actor, 普遍应有的actor属性、方法、消息处理协议、监管什么的都还必须存在。在这篇讨论里我们就通过案例和源码来说明一下EventSourcedBehavior是如何维护内部状态及作为一种actor又应该怎么去使用它。

我们把上一篇讨论里购物车的例子拿来用,再增加一些消息回复response机制,主要是汇报购物车状态:

  1. object ItemInfo {
  2. case class Item(name: String, price: Double)
  3. }
  4. object MyCart {
  5. import ItemInfo._
  6. sealed trait Command
  7. sealed trait Event extends CborSerializable
  8. sealed trait Response
  9. //commands
  10. case class AddItem(item: Item) extends Command
  11. case object PayCart extends Command
  12. case class CountItems(replyTo: ActorRef[Response]) extends Command
  13. //event
  14. case class ItemAdded(item: Item) extends Event
  15. case object CartPaid extends Event
  16. //state
  17. case class CartLoad(load: List[Item] = Nil)
  18. //response
  19. case class PickedItems(items: List[Item]) extends Response
  20. case object CartEmpty extends Response
  21. val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
  22. cmd match {
  23. case AddItem(item) =>
  24. Effect.persist(ItemAdded(item))
  25. case PayCart =>
  26. Effect.persist(CartPaid)
  27. case CountItems(replyTo) =>
  28. Effect.none.thenRun { cart =>
  29. cart.load match {
  30. case Nil =>
  31. replyTo ! CartEmpty
  32. case listOfItems =>
  33. replyTo ! PickedItems(listOfItems)
  34. }
  35. }
  36. }
  37. }
  38. val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
  39. evt match {
  40. case ItemAdded(item) =>
  41. state.copy(load = item :: state.load)
  42. case CartPaid =>
  43. state.copy(load = Nil)
  44. }
  45. }
  46. def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
  47. persistenceId = PersistenceId("10","1013"),
  48. emptyState = CartLoad(),
  49. commandHandler = commandHandler,
  50. eventHandler = eventHandler
  51. )
  52. }
  53. object Shopper {
  54. import ItemInfo._
  55. sealed trait Command extends CborSerializable
  56. case class GetItem(item: Item) extends Command
  57. case object Settle extends Command
  58. case object GetCount extends Command
  59. case class WrappedResponse(res: MyCart.Response) extends Command
  60. def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  61. val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
  62. val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
  63. Behaviors.receiveMessage { msg =>
  64. msg match {
  65. case GetItem(item) =>
  66. shoppingCart ! MyCart.AddItem(item)
  67. case Settle =>
  68. shoppingCart ! MyCart.PayCart
  69. case GetCount =>
  70. shoppingCart ! MyCart.CountItems(cartRef)
  71. case WrappedResponse(res) => res match {
  72. case MyCart.PickedItems(items) =>
  73. ctx.log.info("**************Current Items in Cart: {}*************", items)
  74. case MyCart.CartEmpty =>
  75. ctx.log.info("**************shopping cart is empty!***************")
  76. }
  77. }
  78. Behaviors.same
  79. }
  80. }
  81. }
  82. object ShoppingCart extends App {
  83. import ItemInfo._
  84. val shopper = ActorSystem(Shopper(),"shopper")
  85. shopper ! Shopper.GetItem(Item("banana",11.20))
  86. shopper ! Shopper.GetItem(Item("watermelon",4.70))
  87. shopper ! Shopper.GetCount
  88. shopper ! Shopper.Settle
  89. shopper ! Shopper.GetCount
  90. scala.io.StdIn.readLine()
  91. shopper.terminate()
  92. }

实际上EventSourcedBehavior里还嵌入了回复机制,完成一项Command处理后必须回复指令方,否则程序无法通过编译。如下:

  1. private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
  2. if (acc.canWithdraw(cmd.amount))
  3. Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
  4. else
  5. Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
  6. }

不过这个回复机制是一种副作用。即,串连在Effect产生之后立即实施。这个动作是在eventHandler之前。在这个时段无法回复最新的状态。

说到side-effect, 如Effect.persist().thenRun(produceSideEffect): 当成功持续化event后可以安心进行一些其它的操作。例如,当影响库存数的event被persist后可以马上从账上扣减库存。

在上面这个ShoppingCart例子里我们没有发现状态转换代码如Behaviors.same。这只能是EventSourcedBehavior属于更高层次的Behavior,状态转换已经嵌入在eventHandler里了,还记着这个函数的款式吧  (State,Event) => State, 这个State就是状态了。

Events persist在journal里,如果persist操作中journal出现异常,EventSourcedBehavior自备了安全监管策略,如下:

  1. def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
  2. persistenceId = PersistenceId("10","1013"),
  3. emptyState = CartLoad(),
  4. commandHandler = commandHandler,
  5. eventHandler = eventHandler
  6. ).onPersistFailure(
  7. SupervisorStrategy
  8. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  9. .withMaxRestarts(3)
  10. .withResetBackoffAfter(10.seconds))

值得注意的是:这个策略只适用于onPersistFailure(),从外部用Behaviors.supervisor()包嵌是无法实现处理PersistFailure效果的。但整个actor还是需要一种Backoff策略,因为在EventSourcedBehavior内部commandHandler,eventHandler里可能也会涉及一些数据库操作。在操作失败后需要某种Backoff重启策略。那么我们可以为actor增加监控策略如下:

  1. def apply(): Behavior[Command] =
  2. Behaviors.supervise(
  3. Behaviors.setup { ctx =>
  4. EventSourcedBehavior[Command, Event, CartLoad](
  5. persistenceId = PersistenceId("10", "1013"),
  6. emptyState = CartLoad(),
  7. commandHandler = commandHandler,
  8. eventHandler = eventHandler
  9. ).onPersistFailure(
  10. SupervisorStrategy
  11. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  12. .withMaxRestarts(3)
  13. .withResetBackoffAfter(10.seconds))
  14. }
  15. ).onFailure(
  16. SupervisorStrategy
  17. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  18. .withMaxRestarts(3)
  19. .withResetBackoffAfter(10.seconds)
  20. )

现在这个MyCart可以说已经是个安全、强韧性的actor了。

既然是一种persistentActor,那么持久化的管理应该也算是核心功能了。EventSourcedBehavior通过接收信号提供了对持久化过程监控功能,如:

  1. def apply(): Behavior[Command] =
  2. Behaviors.supervise(
  3. Behaviors.setup[Command] { ctx =>
  4. EventSourcedBehavior[Command, Event, CartLoad](
  5. persistenceId = PersistenceId("10", "1013"),
  6. emptyState = CartLoad(),
  7. commandHandler = commandHandler,
  8. eventHandler = eventHandler
  9. ).onPersistFailure(
  10. SupervisorStrategy
  11. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  12. .withMaxRestarts(3)
  13. .withResetBackoffAfter(10.seconds)
  14. ).receiveSignal {
  15. case (state, RecoveryCompleted) =>
  16. ctx.log.info("**************Recovery Completed with state: {}***************",state)
  17. case (state, SnapshotCompleted(meta)) =>
  18. ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
  19. case (state,RecoveryFailed(err)) =>
  20. ctx.log.error("recovery failed with: {}",err.getMessage)
  21. case (state,SnapshotFailed(meta,err)) =>
  22. ctx.log.error("snapshoting failed with: {}",err.getMessage)
  23. }
  24. }
  25. ).onFailure(
  26. SupervisorStrategy
  27. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  28. .withMaxRestarts(3)
  29. .withResetBackoffAfter(10.seconds)
  30. )

EventSourcedBehavior.receiveSignal是个偏函数:

  1. def receiveSignal(signalHandler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State]

下面是一个EventSourcedBehavior Signal 清单:

  1. sealed trait EventSourcedSignal extends Signal
  2. @DoNotInherit sealed abstract class RecoveryCompleted extends EventSourcedSignal
  3. case object RecoveryCompleted extends RecoveryCompleted {
  4. def instance: RecoveryCompleted = this
  5. }
  6. final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
  7. def getFailure(): Throwable = failure
  8. }
  9. final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
  10. def getSnapshotMetadata(): SnapshotMetadata = metadata
  11. }
  12. final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal {
  13. def getFailure(): Throwable = failure
  14. def getSnapshotMetadata(): SnapshotMetadata = metadata
  15. }
  16. object SnapshotMetadata {
  17. /**
  18. * @param persistenceId id of persistent actor from which the snapshot was taken.
  19. * @param sequenceNr sequence number at which the snapshot was taken.
  20. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  21. * in milliseconds from the epoch of 1970-01-01T00:00:00Z.
  22. */
  23. def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
  24. new SnapshotMetadata(persistenceId, sequenceNr, timestamp)
  25. }
  26. /**
  27. * Snapshot metadata.
  28. *
  29. * @param persistenceId id of persistent actor from which the snapshot was taken.
  30. * @param sequenceNr sequence number at which the snapshot was taken.
  31. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  32. * in milliseconds from the epoch of 1970-01-01T00:00:00Z.
  33. */
  34. final class SnapshotMetadata(val persistenceId: String, val sequenceNr: Long, val timestamp: Long) {
  35. override def toString: String =
  36. s"SnapshotMetadata($persistenceId,$sequenceNr,$timestamp)"
  37. }
  38. final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal {
  39. def getTarget(): DeletionTarget = target
  40. }
  41. final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
  42. def getFailure(): Throwable = failure
  43. def getTarget(): DeletionTarget = target
  44. }
  45. final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal {
  46. def getToSequenceNr(): Long = toSequenceNr
  47. }
  48. final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
  49. def getFailure(): Throwable = failure
  50. def getToSequenceNr(): Long = toSequenceNr
  51. }

当然,EventSourcedBehavior之所以能具备自我修复能力其中一项是因为它有对持久化的事件重演机制。如果每次启动都需要对所有历史事件进行重演的话会很不现实。必须用snapshot来浓缩历史事件:

  1. def apply(): Behavior[Command] =
  2. Behaviors.supervise(
  3. Behaviors.setup[Command] { ctx =>
  4. EventSourcedBehavior[Command, Event, CartLoad](
  5. persistenceId = PersistenceId("10", "1013"),
  6. emptyState = CartLoad(),
  7. commandHandler = commandHandler,
  8. eventHandler = eventHandler
  9. ).onPersistFailure(
  10. SupervisorStrategy
  11. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  12. .withMaxRestarts(3)
  13. .withResetBackoffAfter(10.seconds)
  14. ).receiveSignal {
  15. case (state, RecoveryCompleted) =>
  16. ctx.log.info("**************Recovery Completed with state: {}***************",state)
  17. case (state, SnapshotCompleted(meta)) =>
  18. ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
  19. case (state,RecoveryFailed(err)) =>
  20. ctx.log.error("recovery failed with: {}",err.getMessage)
  21. case (state,SnapshotFailed(meta,err)) =>
  22. ctx.log.error("snapshoting failed with: {}",err.getMessage)
  23. }.snapshotWhen {
  24. case (state,CartPaid,seqnum) =>
  25. ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
  26. true
  27. case (state,event,seqnum) => false
  28. }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
  29. }
  30. ).onFailure(
  31. SupervisorStrategy
  32. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  33. .withMaxRestarts(3)
  34. .withResetBackoffAfter(10.seconds)
  35. )

下面是本次示范的源码:

build.sbt

  1. name := "learn-akka-typed"
  2. version := "0.1"
  3. scalaVersion := "2.13.1"
  4. scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
  5. javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
  6. val AkkaVersion = "2.6.5"
  7. val AkkaPersistenceCassandraVersion = "1.0.0"
  8. libraryDependencies ++= Seq(
  9. "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  10. "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  11. "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  12. "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  13. "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  14. "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  15. "ch.qos.logback" % "logback-classic" % "1.2.3"
  16. )

application.conf

  1. akka.actor.allow-java-serialization = on
  2. akka {
  3. loglevel = DEBUG
  4. actor {
  5. serialization-bindings {
  6. "com.learn.akka.CborSerializable" = jackson-cbor
  7. }
  8. }
  9. # use Cassandra to store both snapshots and the events of the persistent actors
  10. persistence {
  11. journal.plugin = "akka.persistence.cassandra.journal"
  12. snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  13. }
  14. }
  15. akka.persistence.cassandra {
  16. # don't use autocreate in production
  17. journal.keyspace = "poc"
  18. journal.keyspace-autocreate = on
  19. journal.tables-autocreate = on
  20. snapshot.keyspace = "poc_snapshot"
  21. snapshot.keyspace-autocreate = on
  22. snapshot.tables-autocreate = on
  23. }
  24. datastax-java-driver {
  25. basic.contact-points = ["192.168.11.189:9042"]
  26. basic.load-balancing-policy.local-datacenter = "datacenter1"
  27. }

ShoppingCart.scala

  1. package com.learn.akka
  2. import akka.actor.typed._
  3. import akka.persistence.typed._
  4. import akka.actor.typed.scaladsl.Behaviors
  5. import akka.persistence.typed.scaladsl._
  6. import scala.concurrent.duration._
  7. object ItemInfo {
  8. case class Item(name: String, price: Double)
  9. }
  10. object MyCart {
  11. import ItemInfo._
  12. sealed trait Command
  13. sealed trait Event extends CborSerializable
  14. sealed trait Response
  15. //commands
  16. case class AddItem(item: Item) extends Command
  17. case object PayCart extends Command
  18. case class CountItems(replyTo: ActorRef[Response]) extends Command
  19. //event
  20. case class ItemAdded(item: Item) extends Event
  21. case object CartPaid extends Event
  22. //state
  23. case class CartLoad(load: List[Item] = Nil)
  24. //response
  25. case class PickedItems(items: List[Item]) extends Response
  26. case object CartEmpty extends Response
  27. val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
  28. cmd match {
  29. case AddItem(item) =>
  30. Effect.persist(ItemAdded(item))
  31. case PayCart =>
  32. Effect.persist(CartPaid)
  33. case CountItems(replyTo) =>
  34. Effect.none.thenRun { cart =>
  35. cart.load match {
  36. case Nil =>
  37. replyTo ! CartEmpty
  38. case listOfItems =>
  39. replyTo ! PickedItems(listOfItems)
  40. }
  41. }
  42. }
  43. }
  44. val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
  45. evt match {
  46. case ItemAdded(item) =>
  47. state.copy(load = item :: state.load)
  48. case CartPaid =>
  49. state.copy(load = Nil)
  50. }
  51. }
  52. def apply(): Behavior[Command] =
  53. Behaviors.supervise(
  54. Behaviors.setup[Command] { ctx =>
  55. EventSourcedBehavior[Command, Event, CartLoad](
  56. persistenceId = PersistenceId("10", "1013"),
  57. emptyState = CartLoad(),
  58. commandHandler = commandHandler,
  59. eventHandler = eventHandler
  60. ).onPersistFailure(
  61. SupervisorStrategy
  62. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  63. .withMaxRestarts(3)
  64. .withResetBackoffAfter(10.seconds)
  65. ).receiveSignal {
  66. case (state, RecoveryCompleted) =>
  67. ctx.log.info("**************Recovery Completed with state: {}***************",state)
  68. case (state, SnapshotCompleted(meta)) =>
  69. ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
  70. case (state,RecoveryFailed(err)) =>
  71. ctx.log.error("recovery failed with: {}",err.getMessage)
  72. case (state,SnapshotFailed(meta,err)) =>
  73. ctx.log.error("snapshoting failed with: {}",err.getMessage)
  74. }.snapshotWhen {
  75. case (state,CartPaid,seqnum) =>
  76. ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
  77. true
  78. case (state,event,seqnum) => false
  79. }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
  80. }
  81. ).onFailure(
  82. SupervisorStrategy
  83. .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
  84. .withMaxRestarts(3)
  85. .withResetBackoffAfter(10.seconds)
  86. )
  87. }
  88. object Shopper {
  89. import ItemInfo._
  90. sealed trait Command extends CborSerializable
  91. case class GetItem(item: Item) extends Command
  92. case object Settle extends Command
  93. case object GetCount extends Command
  94. case class WrappedResponse(res: MyCart.Response) extends Command
  95. def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
  96. val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
  97. val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
  98. Behaviors.receiveMessage { msg =>
  99. msg match {
  100. case GetItem(item) =>
  101. shoppingCart ! MyCart.AddItem(item)
  102. case Settle =>
  103. shoppingCart ! MyCart.PayCart
  104. case GetCount =>
  105. shoppingCart ! MyCart.CountItems(cartRef)
  106. case WrappedResponse(res) => res match {
  107. case MyCart.PickedItems(items) =>
  108. ctx.log.info("**************Current Items in Cart: {}*************", items)
  109. case MyCart.CartEmpty =>
  110. ctx.log.info("**************shopping cart is empty!***************")
  111. }
  112. }
  113. Behaviors.same
  114. }
  115. }
  116. }
  117. object ShoppingCart extends App {
  118. import ItemInfo._
  119. val shopper = ActorSystem(Shopper(),"shopper")
  120. shopper ! Shopper.GetItem(Item("banana",11.20))
  121. shopper ! Shopper.GetItem(Item("watermelon",4.70))
  122. shopper ! Shopper.GetCount
  123. shopper ! Shopper.Settle
  124. shopper ! Shopper.GetCount
  125. scala.io.StdIn.readLine()
  126. shopper.terminate()
  127. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13054219.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号