经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(3) - PersistentActor has EventSourcedBehavior
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:17  对本文有异议

   akka-typed中已经没有PersistentActor了。取而代之的是带有EventSourcedBehavior的actor,也就是一种专门支持EventSource模式的actor。EventSource的原理和作用在之前的博客里已经有了比较详细的介绍,这里就不再重复了。本篇直接从EventsourcedBehavior actor的具体应用开始介绍。支持EventSource应用的基本数据类型包括 指令Command, 事件Event,状态State。EventSourcing其实就是一个有限状态机fsm finite-state-machine,执行Command,产生Event,改变State,终而复始。下面是一个简单的EventSource类型定义:

  1. trait CborSerializable {}
  2. object Cart {
  3. case class Item(name: String, price: Double)
  4. sealed trait Command extends CborSerializable
  5. sealed trait Event
  6. //commands
  7. case class AddItem(item: Item) extends Command
  8. case object PayCart extends Command
  9. //event
  10. case class ItemAdded(item: Item) extends Event
  11. case object CartPaid extends Event
  12. //state
  13. case class CartLoad(load: Set[Item] = Set.empty)
  14. val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
  15. cmd match {
  16. case AddItem(item) =>
  17. Effect.persist(ItemAdded(item))
  18. case PayCart =>
  19. Effect.persist(CartPaid)
  20. }
  21. }
  22. val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
  23. evt match {
  24. case ItemAdded(item) =>
  25. val sts = state.copy(load = state.load+item)
  26. println(s"current cart loading: ${sts}")
  27. sts
  28. case CartPaid =>
  29. val sts = state.copy(load = Set.empty)
  30. println(s"current cart loading: ${sts.load}")
  31. sts
  32. }
  33. }
  34. def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
  35. persistenceId = PersistenceId("10","1012"),
  36. emptyState = CartLoad(),
  37. commandHandler = commandHandler,
  38. eventHandler = eventHandler
  39. )
  40. }
  41. object EventSource extends App {
  42. import Cart._
  43. val cart = ActorSystem(Cart(),"shopping-cart")
  44. cart ! Cart.AddItem(Item("banana",11.20))
  45. cart ! Cart.AddItem(Item("watermelon",4.70))
  46. scala.io.StdIn.readLine()
  47. cart.terminate()
  48. }

首先要搞清楚几件事:EvenSourcedBehavior定义了一个actor。从Behavior[Command]这个结果类型来看,这个actor可以接收并处理Command类型的消息。既然是个actor那么应该具备了receiveMessage,receiveSignal这两项基本能力,但我们又不用自己来定义这些功能。怎么回事呢?看看EventSourcedBehavior的源代码吧:

  1. object EventSourcedBehavior {
  2. ...
  3. def apply[Command, Event, State](
  4. persistenceId: PersistenceId,
  5. emptyState: State,
  6. commandHandler: (State, Command) => Effect[Event, State],
  7. eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
  8. val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
  9. EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
  10. }
  11. ...
  12. }

这个EventSourcedBehavior就是某种Behavior。它的所有特殊功能看来应该是在EventSourcedBehaviorsImpl里实现的:

  1. private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
  2. persistenceId: PersistenceId,
  3. emptyState: State,
  4. commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State],
  5. eventHandler: EventSourcedBehavior.EventHandler[State, Event],
  6. loggerClass: Class[_],
  7. ...
  8. ) extends EventSourcedBehavior[Command, Event, State] {
  9. ...
  10. Behaviors
  11. .supervise {
  12. Behaviors.setup[Command] { _ =>
  13. val eventSourcedSetup = new BehaviorSetup(
  14. ctx.asInstanceOf[ActorContext[InternalProtocol]],
  15. persistenceId,
  16. emptyState,
  17. commandHandler,
  18. eventHandler,
  19. WriterIdentity.newIdentity(),
  20. actualSignalHandler,
  21. tagger,
  22. eventAdapter,
  23. snapshotAdapter,
  24. snapshotWhen,
  25. recovery,
  26. retention,
  27. holdingRecoveryPermit = false,
  28. settings = settings,
  29. stashState = stashState)
  30. // needs to accept Any since we also can get messages from the journal
  31. // not part of the user facing Command protocol
  32. def interceptor: BehaviorInterceptor[Any, InternalProtocol] = new BehaviorInterceptor[Any, InternalProtocol] {
  33. import BehaviorInterceptor._
  34. override def aroundReceive(
  35. ctx: typed.TypedActorContext[Any],
  36. msg: Any,
  37. target: ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = {
  38. val innerMsg = msg match {
  39. case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res)
  40. case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res)
  41. case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted
  42. case internal: InternalProtocol => internal // such as RecoveryTickEvent
  43. case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd)
  44. }
  45. target(ctx, innerMsg)
  46. }
  47. override def aroundSignal(
  48. ctx: typed.TypedActorContext[Any],
  49. signal: Signal,
  50. target: SignalTarget[InternalProtocol]): Behavior[InternalProtocol] = {
  51. if (signal == PostStop) {
  52. eventSourcedSetup.cancelRecoveryTimer()
  53. // clear stash to be GC friendly
  54. stashState.clearStashBuffers()
  55. }
  56. target(ctx, signal)
  57. }
  58. override def toString: String = "EventSourcedBehaviorInterceptor"
  59. }
  60. Behaviors.intercept(() => interceptor)(RequestingRecoveryPermit(eventSourcedSetup)).narrow
  61. }
  62. }
  63. .onFailure[JournalFailureException](supervisionStrategy)
  64. }

EventSourcedBehaviorImpl还是一种Behavior[Command],它又是通过一个BehaviorInterceptor实现的。BehaviorInterceptor.aroundReceive和BehaviorInterceptor.aroundSignal可以代替receiveMessage和receiveSignal的工作,这点从这两个函数的结果类型可以得到一些验证:

  1. /* @tparam Outer The outer message type – the type of messages the intercepting behavior will accept
  2. * @tparam Inner The inner message type - the type of message the wrapped behavior accepts
  3. *
  4. * @see [[BehaviorSignalInterceptor]]
  5. */
  6. abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) {
  7. import BehaviorInterceptor._
  8. ...
  9. /**
  10. * Intercept a message sent to the running actor. Pass the message on to the next behavior
  11. * in the stack by passing it to `target.apply`, return `Behaviors.same` without invoking `target`
  12. * to filter out the message.
  13. *
  14. * @return The behavior for next message or signal
  15. */
  16. def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner]
  17. /**
  18. * Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior
  19. * in the stack by passing it to `target.apply`.
  20. *
  21. * @return The behavior for next message or signal
  22. *
  23. * @see [[BehaviorSignalInterceptor]]
  24. */
  25. def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner]
  26. ...
  27. }

另外,对于EventSourcedBehavior来说,收到Command, 处理Command方式应该是通过外部提供的这个commandHandler来实现才是最值得注意的:

  1. final class HandlingCommands(state: RunningState[S])
  2. extends AbstractBehavior[InternalProtocol](setup.context)
  3. with WithSeqNrAccessible {
  4. def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
  5. case IncomingCommand(c: C @unchecked) => onCommand(state, c)
  6. case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
  7. case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
  8. case get: GetState[S @unchecked] => onGetState(get)
  9. case _ => Behaviors.unhandled
  10. }
  11. override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
  12. case PoisonPill =>
  13. if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped
  14. else new HandlingCommands(state.copy(receivedPoisonPill = true))
  15. case signal =>
  16. if (setup.onSignal(state.state, signal, catchAndLog = false)) this
  17. else Behaviors.unhandled
  18. }
  19. def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
  20. val effect = setup.commandHandler(state.state, cmd)
  21. applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
  22. }
  23. ...
  24. }

上面这段代码已经足够说明了。根据commandHandler和eventHandler的函数类型可以得出EventSourcedBehavior处理流程 (State, Command) => (State, Event) => new State, 最终输出new State:

  1. object EventSourcedBehavior {
  2. type CommandHandler[Command, Event, State] = (State, Command) => Effect[Event, State]
  3. type EventHandler[State, Event] = (State, Event) => State
  4. ...
  5. }

commandHandler返回Effect[Event,State]类型结果,也就是说处理Command过程就是产生Event过程,下面是Effect的各种选项:

 

  1. object Effect {
  2. /**
  3. * Persist a single event
  4. *
  5. * Side effects can be chained with `thenRun`
  6. */
  7. def persist[Event, State](event: Event): EffectBuilder[Event, State] = Persist(event)
  8. /**
  9. * Persist multiple events
  10. *
  11. * Side effects can be chained with `thenRun`
  12. */
  13. def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): EffectBuilder[Event, State] =
  14. persist(evt1 :: evt2 :: events.toList)
  15. /**
  16. * Persist multiple events
  17. *
  18. * Side effects can be chained with `thenRun`
  19. */
  20. def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] =
  21. PersistAll(events)
  22. /**
  23. * Do not persist anything
  24. *
  25. * Side effects can be chained with `thenRun`
  26. */
  27. def none[Event, State]: EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]]
  28. /**
  29. * This command is not handled, but it is not an error that it isn't.
  30. *
  31. * Side effects can be chained with `thenRun`
  32. */
  33. def unhandled[Event, State]: EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]]
  34. /**
  35. * Stop this persistent actor
  36. * Side effects can be chained with `thenRun`
  37. */
  38. def stop[Event, State](): EffectBuilder[Event, State] =
  39. none.thenStop()
  40. /**
  41. * Stash the current command. Can be unstashed later with [[Effect.unstashAll]].
  42. *
  43. * Note that the stashed commands are kept in an in-memory buffer, so in case of a crash they will not be
  44. * processed. They will also be discarded if the actor is restarted (or stopped) due to that an exception was
  45. * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist
  46. * failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]].
  47. *
  48. * Side effects can be chained with `thenRun`
  49. */
  50. def stash[Event, State](): ReplyEffect[Event, State] =
  51. Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply()
  52. /**
  53. * Unstash the commands that were stashed with [[Effect.stash]].
  54. *
  55. * It's allowed to stash messages while unstashing. Those newly added
  56. * commands will not be processed by this `unstashAll` effect and have to be unstashed
  57. * by another `unstashAll`.
  58. *
  59. * @see [[EffectBuilder.thenUnstashAll]]
  60. */
  61. def unstashAll[Event, State](): Effect[Event, State] =
  62. CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]())
  63. /**
  64. * Send a reply message to the command. The type of the
  65. * reply message must conform to the type specified by the passed replyTo `ActorRef`.
  66. *
  67. * This has the same semantics as `cmd.replyTo.tell`.
  68. *
  69. * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
  70. * when the `EventSourcedBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When
  71. * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
  72. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
  73. * finding mistakes.
  74. */
  75. def reply[ReplyMessage, Event, State](replyTo: ActorRef[ReplyMessage])(
  76. replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
  77. none[Event, State].thenReply[ReplyMessage](replyTo)(_ => replyWithMessage)
  78. /**
  79. * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
  80. * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
  81. * sent for a specific command or the reply will be sent later.
  82. */
  83. def noReply[Event, State]: ReplyEffect[Event, State] =
  84. none.thenNoReply()
  85. }

 

接着用handleEvent来根据产生的Event更新State,如下:

 

  1. @tailrec def applyEffects(
  2. msg: Any,
  3. state: RunningState[S],
  4. effect: Effect[E, S],
  5. sideEffects: immutable.Seq[SideEffect[S]] = Nil): Behavior[InternalProtocol] = {
  6. if (setup.log.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]])
  7. setup.log.debugN(
  8. s"Handled command [{}], resulting effect: [{}], side effects: [{}]",
  9. msg.getClass.getName,
  10. effect,
  11. sideEffects.size)
  12. effect match {
  13. case CompositeEffect(eff, currentSideEffects) =>
  14. // unwrap and accumulate effects
  15. applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)
  16. case Persist(event) =>
  17. // apply the event before persist so that validation exception is handled before persisting
  18. // the invalid event, in case such validation is implemented in the event handler.
  19. // also, ensure that there is an event handler for each single event
  20. val newState = state.applyEvent(setup, event)
  21. val eventToPersist = adaptEvent(event)
  22. val eventAdapterManifest = setup.eventAdapter.manifest(event)
  23. val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)
  24. val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
  25. persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects)
  26. case PersistAll(events) =>
  27. if (events.nonEmpty) {
  28. // apply the event before persist so that validation exception is handled before persisting
  29. // the invalid event, in case such validation is implemented in the event handler.
  30. // also, ensure that there is an event handler for each single event
  31. var seqNr = state.seqNr
  32. val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) {
  33. case ((currentState, snapshot), event) =>
  34. seqNr += 1
  35. val shouldSnapshot =
  36. if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot
  37. (currentState.applyEvent(setup, event), shouldSnapshot)
  38. }
  39. val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))
  40. val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist)
  41. persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)
  42. } else {
  43. // run side-effects even when no events are emitted
  44. tryUnstashOne(applySideEffects(sideEffects, state))
  45. }
  46. case _: PersistNothing.type =>
  47. tryUnstashOne(applySideEffects(sideEffects, state))
  48. case _: Unhandled.type =>
  49. import akka.actor.typed.scaladsl.adapter._
  50. setup.context.system.toClassic.eventStream
  51. .publish(UnhandledMessage(msg, setup.context.system.toClassic.deadLetters, setup.context.self.toClassic))
  52. tryUnstashOne(applySideEffects(sideEffects, state))
  53. case _: Stash.type =>
  54. stashUser(IncomingCommand(msg))
  55. tryUnstashOne(applySideEffects(sideEffects, state))
  56. }
  57. }

好了,基本原理都在这了,再挖下去会更肮脏。为上面的例子设了个运行环境,主要是测试persistence-cassandra-plugin的正确设置,如下:

build.sbt

  1. name := "learn-akka-typed"
  2. version := "0.1"
  3. scalaVersion := "2.13.1"
  4. scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
  5. javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
  6. val AkkaVersion = "2.6.5"
  7. val AkkaPersistenceCassandraVersion = "1.0.0"
  8. libraryDependencies ++= Seq(
  9. "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  10. "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  11. "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  12. "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  13. "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  14. "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  15. "ch.qos.logback" % "logback-classic" % "1.2.3"
  16. )

application.conf

  1. akka.actor.allow-java-serialization = on
  2. akka {
  3. loglevel = DEBUG
  4. actor {
  5. serialization-bindings {
  6. "com.learn.akka.CborSerializable" = jackson-cbor
  7. }
  8. }
  9. # use Cassandra to store both snapshots and the events of the persistent actors
  10. persistence {
  11. journal.plugin = "akka.persistence.cassandra.journal"
  12. snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  13. }
  14. }
  15. akka.persistence.cassandra {
  16. # don't use autocreate in production
  17. journal.keyspace = "poc"
  18. journal.keyspace-autocreate = on
  19. journal.tables-autocreate = on
  20. snapshot.keyspace = "poc_snapshot"
  21. snapshot.keyspace-autocreate = on
  22. snapshot.tables-autocreate = on
  23. }
  24. datastax-java-driver {
  25. basic.contact-points = ["192.168.11.189:9042"]
  26. basic.load-balancing-policy.local-datacenter = "datacenter1"
  27. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13035165.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号