经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
akka-typed(8) - CQRS读写分离模式
来源:cnblogs  作者:雪川大虫  时间:2020/11/9 16:09:10  对本文有异议

 前面介绍了事件源(EventSource)和集群(cluster),现在到了讨论CQRS的时候了。CQRS即读写分离模式,由独立的写方程序和读方程序组成,具体原理在以前的博客里介绍过了。akka-typed应该自然支持CQRS模式,最起码本身提供了对写方编程的支持,这点从EventSourcedBehavior 可以知道。akka-typed提供了新的EventSourcedBehavior-Actor,极大方便了对persistentActor的应用开发,但同时也给编程者造成了一些限制。如手工改变状态会更困难了、EventSourcedBehavior不支持多层式的persist,也就是说通过persist某些特定的event然后在event-handler程序里进行状态处理是不可能的了。我这里有个例子,是个购物车应用:当完成支付后需要取个快照(snapshot),下面是这个snapshot的代码:

  1. snapshotWhen {
  2. (state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr)
  3. }
  4. ...
  5. def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = {
  6. if (evt.isInstanceOf[Events.PaymentMade]
  7. || evt.isInstanceOf[Events.VoidVoucher.type]
  8. || evt.isInstanceOf[Events.SuspVoucher.type])
  9. if (state.items.isEmpty) {
  10. log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...")
  11. true
  12. } else
  13. false
  14. else
  15. false

  16. }

判断event类型是没有问题的,因为正是当前的事件,但另一个条件是购物车必须是清空了的。这个有点为难,因为这个状态要依赖这几个event运算的结果才能确定,也就是下一步,但确定结果又需要对购物车内容进行计算,好像是个死循环。在akka-classic里我们可以在判断了event运算结果后,如果需要改变状态就再persist一个特殊的event,然后在这个event的handler进行状态处理。没办法,EventSourcedBehavior不支持多层persist,只有这样做:

 

  1. case PaymentMade(acct, dpt, num, ref,amount) =>
  2. ...
  3. writerInternal.lastVoucher = Voucher(vchs, vItems)
  4. endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)
  5. Voucher(vchs.nextVoucher, List())
  6. ...

 

我只能先吧当前状态保存下来、进行结单运算、然后清空购物车,这样snapshot就可以顺利进行了。

好了,akka的读方编程是通过PersistentQuery实现的。reader的作用就是把event从数据库读出来后再恢复成具体的数据格式。我们从reader的调用了解一下这个应用里reader的实现细节:

 

  1. val readerShard = writerInternal.optSharding.get
  2. val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")
  3. readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)

可以看到这个reader是一个集群分片,sharding-entity。想法是每单完成购买后发个消息给一个entity、这个entity再完成reader功能后自动终止,立即释放出占用的资源。reader-actor的定义如下:

  1. object POSReader extends LogSupport {
  2. val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader")
  3. def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = {
  4. log.stepOn = trace
  5. implicit var pid: PID = PID("","")
  6. Behaviors.supervise(
  7. Behaviors.setup[Command] { ctx =>
  8. Behaviors.withTimers { timer =>
  9. implicit val ec = ctx.executionContext
  10. Behaviors.receiveMessage {
  11. case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) =>
  12. pid = PID(shopid, posid)
  13. log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid))
  14. val futReadSaveNExport = for {
  15. txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
  16. _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,
  17. { if(txntype == Events.TXNTYPE.voidall)
  18. txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
  19. else txnitems },
  20. trace)(ctx.system.toClassic, pid)
  21. } yield ()
  22. ctx.pipeToSelf(futReadSaveNExport) {
  23. case Success(_) => {
  24. timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
  25. StopReader
  26. }
  27. case Failure(err) =>
  28. log.error(s"POSReader: Error: ${err.getMessage}")
  29. timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
  30. StopReader
  31. }
  32. Behaviors.same
  33. case StopReader =>
  34. Behaviors.same
  35. case ReaderFinish(shopid, posid, vchnum) =>
  36. Behaviors.stopped(
  37. () => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid))
  38. )
  39. }
  40. }
  41. }
  42. ).onFailure(SupervisorStrategy.restart)
  43. }

reader就是一个普通的actor。值得注意的是读方程序可能是一个庞大复杂的程序,肯定需要分割成多个模块,所以我们可以按照流程顺序进行模块功能切分:这样下面的模块可能会需要上面模块产生的结果才能继续。记住,在actor中绝对避免阻塞线程,所有的模块都返回Future, 然后用for-yield串起来。上面我们用了ctx.pipeToSelf 在Future运算完成后发送ReaderFinish消息给自己,通知自己停止。

在这个例子里我们把reader任务分成:

1、从数据库读取事件

2、事件重演一次产生状态数据(购物车内容)

3、将形成的购物车内容作为交易单据项目存入数据库

4、向用户提供的restapi输出交易数据

event读取是通过cassandra-persistence-plugin实现的:

  1. val query =
  2. PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
  3. // issue query to journal
  4. val source: Source[EventEnvelope, NotUsed] =
  5. query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)
  6. // materialize stream, consuming events
  7. val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

这部分比较简单:定义一个PersistenceQuery,用它产生一个Source,然后run这个Source获取Future[List[Any]]。

重演事件产生交易数据:

  1. def buildVoucher(actions: List[Any]): List[TxnItem] = {
  2. log.step(s"POSReader: read actions: $actions")
  3. val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])
  4. val listOfActions = onlytxns.reverse zip (LazyList from 1) //zipWithIndex
  5. listOfActions.foreach { case (txn,idx) =>
  6. txn.asInstanceOf[Action] match {
  7. case Voided(_) =>
  8. case ti@_ =>
  9. curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)
  10. if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {
  11. curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr)
  12. log.step(s"POSReader: voided txnitem: $curTxnItem")
  13. }
  14. val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)
  15. vchState = vch.header
  16. vchItems = vch.txnItems
  17. log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")
  18. }
  19. }
  20. log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")
  21. vchItems.txnitems
  22. }

重演List[Event],产生了List[TxnItem]。

向数据库里写List[TxnItem]:

 

  1. def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])(
  2. implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

  1. val txnitems: Future[List[Events.TxnItem]] = for {
  2. lst1 <- readActions //read list from Source
  3. lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
  4. readActions
  5. else FastFuture.successful(lst1)
  6. items <- FastFuture.successful( buildVoucher(lstTxns) )
  7. _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
  8. _ <- session.close(ec)
  9. } yield items

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

  1. val txnitems: Future[List[Events.TxnItem]] = for {
  2. lst1 <- readActions //read list from Source
  3. lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
  4. readActions
  5. else FastFuture.successful(lst1)
  6. items <- FastFuture.successful( buildVoucher(lstTxns) )
  7. _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
  8. _ <- session.close(ec)
  9. } yield items

注意:这个for返回的Future[List[TxnItem]],是提供给restapi输出功能的。在那里List[TxnItem]会被转换成json作为post的包嵌数据。

现在所有子任务的返回结果类型都是Future了。我们可以再用for来把它们串起来:

  1. val futReadSaveNExport = for {
  2. txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
  3. _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,
  4. { if(txntype == Events.TXNTYPE.voidall)
  5. txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
  6. else txnitems },
  7. trace)(ctx.system.toClassic, pid)
  8. } yield ()

说到EventSourcedBehavior,因为用了cassandra-plugin,忽然想起配置文件里新旧有很大区别。现在这个application.conf是这样的: 

  1. akka {
  2. loglevel = INFO
  3. actor {
  4. provider = cluster
  5. serialization-bindings {
  6. "com.datatech.pos.cloud.CborSerializable" = jackson-cbor
  7. }
  8. }
  9. remote {
  10. artery {
  11. canonical.hostname = "192.168.11.189"
  12. canonical.port = 0
  13. }
  14. }
  15. cluster {
  16. seed-nodes = [
  17. "akka://cloud-pos-server@192.168.11.189:2551"]
  18. sharding {
  19. passivate-idle-entity-after = 5 m
  20. }
  21. }
  22. # use Cassandra to store both snapshots and the events of the persistent actors
  23. persistence {
  24. journal.plugin = "akka.persistence.cassandra.journal"
  25. snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  26. }
  27. }
  28. akka.persistence.cassandra {
  29. # don't use autocreate in production
  30. journal.keyspace = "poc2g"
  31. journal.keyspace-autocreate = on
  32. journal.tables-autocreate = on
  33. snapshot.keyspace = "poc2g_snapshot"
  34. snapshot.keyspace-autocreate = on
  35. snapshot.tables-autocreate = on
  36. }
  37. datastax-java-driver {
  38. basic.contact-points = ["192.168.11.189:9042"]
  39. basic.load-balancing-policy.local-datacenter = "datacenter1"
  40. }

akka.persitence.cassandra段落里可以定义keyspace名称,这样新旧版本应用可以共用一个cassandra,同时在线。

 

原文链接:http://www.cnblogs.com/tiger-xc/p/13193833.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号