经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
Elsa V3学习之Flowchart详解(上)
来源:cnblogs  作者:饭勺oO  时间:2024/8/20 8:59:33  对本文有异议

前面我们通过界面学习了Elsa的一些基本使用,若是有实操的小伙伴们,应该可以发现,我们工作流定义中的root,既我们的工作流画布其实也是一个activity,就是Flowchart。那么本文将来解读以下flowchart的执行逻辑。

Flowchart源码

为了方便大家,这里先直接把flowchart的源码贴出。

  1. using System.ComponentModel;
  2. using System.Runtime.CompilerServices;
  3. using Elsa.Extensions;
  4. using Elsa.Workflows.Activities.Flowchart.Contracts;
  5. using Elsa.Workflows.Activities.Flowchart.Extensions;
  6. using Elsa.Workflows.Activities.Flowchart.Models;
  7. using Elsa.Workflows.Attributes;
  8. using Elsa.Workflows.Contracts;
  9. using Elsa.Workflows.Options;
  10. using Elsa.Workflows.Signals;
  11. using Microsoft.Extensions.Logging;
  12. namespace Elsa.Workflows.Activities.Flowchart.Activities;
  13. /// <summary>
  14. /// A flowchart consists of a collection of activities and connections between them.
  15. /// </summary>
  16. [Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
  17. [Browsable(false)]
  18. public class Flowchart : Container
  19. {
  20. internal const string ScopeProperty = "Scope";
  21. /// <inheritdoc />
  22. public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
  23. {
  24. OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
  25. OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
  26. OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
  27. }
  28. /// <summary>
  29. /// The activity to execute when the flowchart starts.
  30. /// </summary>
  31. [Port]
  32. [Browsable(false)]
  33. public IActivity? Start { get; set; }
  34. /// <summary>
  35. /// A list of connections between activities.
  36. /// </summary>
  37. public ICollection<Connection> Connections { get; set; } = new List<Connection>();
  38. /// <inheritdoc />
  39. protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
  40. {
  41. var startActivity = GetStartActivity(context);
  42. if (startActivity == null)
  43. {
  44. // Nothing else to execute.
  45. await context.CompleteActivityAsync();
  46. return;
  47. }
  48. // Schedule the start activity.
  49. await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
  50. }
  51. private IActivity? GetStartActivity(ActivityExecutionContext context)
  52. {
  53. // If there's a trigger that triggered this workflow, use that.
  54. var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
  55. var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;
  56. if (triggerActivity != null)
  57. return triggerActivity;
  58. // If an explicit Start activity was provided, use that.
  59. if (Start != null)
  60. return Start;
  61. // If there is a Start activity on the flowchart, use that.
  62. var startActivity = Activities.FirstOrDefault(x => x is Start);
  63. if (startActivity != null)
  64. return startActivity;
  65. // If there's an activity marked as "Can Start Workflow", use that.
  66. var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
  67. if (canStartWorkflowActivity != null)
  68. return canStartWorkflowActivity;
  69. // If there is a single activity that has no inbound connections, use that.
  70. var root = GetRootActivity();
  71. if (root != null)
  72. return root;
  73. // If no start activity found, return the first activity.
  74. return Activities.FirstOrDefault();
  75. }
  76. /// <summary>
  77. /// Checks if there is any pending work for the flowchart.
  78. /// </summary>
  79. private bool HasPendingWork(ActivityExecutionContext context)
  80. {
  81. var workflowExecutionContext = context.WorkflowExecutionContext;
  82. var activityIds = Activities.Select(x => x.Id).ToList();
  83. var descendantContexts = context.GetDescendents().Where(x => x.ParentActivityExecutionContext == context).ToList();
  84. var activityExecutionContexts = descendantContexts.Where(x => activityIds.Contains(x.Activity.Id)).ToList();
  85. var hasPendingWork = workflowExecutionContext.Scheduler.List().Any(workItem =>
  86. {
  87. var ownerInstanceId = workItem.Owner?.Id;
  88. if (ownerInstanceId == null)
  89. return false;
  90. if (ownerInstanceId == context.Id)
  91. return true;
  92. var ownerContext = context.WorkflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
  93. var ancestors = ownerContext.GetAncestors().ToList();
  94. return ancestors.Any(x => x == context);
  95. });
  96. var hasRunningActivityInstances = activityExecutionContexts.Any(x => x.Status == ActivityStatus.Running);
  97. return hasRunningActivityInstances || hasPendingWork;
  98. }
  99. private IActivity? GetRootActivity()
  100. {
  101. // Get the first activity that has no inbound connections.
  102. var query =
  103. from activity in Activities
  104. let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
  105. where !inboundConnections
  106. select activity;
  107. var rootActivity = query.FirstOrDefault();
  108. return rootActivity;
  109. }
  110. private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
  111. {
  112. var logger = context.GetRequiredService<ILogger<Flowchart>>();
  113. var flowchartContext = context.TargetContext;
  114. var completedActivityContext = context.ChildContext;
  115. var completedActivity = completedActivityContext.Activity;
  116. var result = context.Result;
  117. // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
  118. var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
  119. var outcomeNames = result is Outcomes outcomes
  120. ? outcomes.Names
  121. : [null!, "Done"];
  122. // Only query the outbound connections if the completed activity wasn't already completed.
  123. var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
  124. var children = outboundConnections.Select(x => x.Target.Activity).ToList();
  125. var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());
  126. scope.RegisterActivityExecution(completedActivity);
  127. // If the complete activity is a terminal node, complete the flowchart immediately.
  128. if (completedActivity is ITerminalNode)
  129. {
  130. await flowchartContext.CompleteActivityAsync();
  131. }
  132. else if (scheduleChildren)
  133. {
  134. if (children.Any())
  135. {
  136. // Schedule each child, but only if all of its left inbound activities have already executed.
  137. foreach (var activity in children)
  138. {
  139. var existingActivity = scope.ContainsActivity(activity);
  140. scope.AddActivity(activity);
  141. var inboundActivities = Connections.LeftInboundActivities(activity).ToList();
  142. // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
  143. if (!inboundActivities.Contains(completedActivity))
  144. {
  145. await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
  146. continue;
  147. }
  148. // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join.
  149. if (activity is not IJoinNode)
  150. {
  151. var executionCount = scope.GetExecutionCount(activity);
  152. var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);
  153. if (haveInboundActivitiesExecuted)
  154. await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
  155. }
  156. else
  157. {
  158. // Select an existing activity execution context for this activity, if any.
  159. var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
  160. x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
  161. var scheduleWorkOptions = new ScheduleWorkOptions
  162. {
  163. CompletionCallback = OnChildCompletedAsync,
  164. ExistingActivityExecutionContext = joinContext,
  165. PreventDuplicateScheduling = true
  166. };
  167. if (joinContext != null)
  168. logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
  169. else if (!existingActivity)
  170. logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
  171. else
  172. {
  173. logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
  174. continue;
  175. }
  176. await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
  177. }
  178. }
  179. }
  180. if (!children.Any())
  181. {
  182. await CompleteIfNoPendingWorkAsync(flowchartContext);
  183. }
  184. }
  185. flowchartContext.SetProperty(ScopeProperty, scope);
  186. }
  187. private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
  188. {
  189. var hasPendingWork = HasPendingWork(context);
  190. if (!hasPendingWork)
  191. {
  192. var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);
  193. if (!hasFaultedActivities)
  194. {
  195. await context.CompleteActivityAsync();
  196. }
  197. }
  198. }
  199. private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
  200. {
  201. var flowchartContext = context.ReceiverActivityExecutionContext;
  202. var schedulingActivityContext = context.SenderActivityExecutionContext;
  203. var schedulingActivity = schedulingActivityContext.Activity;
  204. var outcomes = signal.Outcomes;
  205. var outboundConnections = Connections.Where(connection => connection.Source.Activity == schedulingActivity && outcomes.Contains(connection.Source.Port!)).ToList();
  206. var outboundActivities = outboundConnections.Select(x => x.Target.Activity).ToList();
  207. if (outboundActivities.Any())
  208. {
  209. // Schedule each child.
  210. foreach (var activity in outboundActivities) await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
  211. }
  212. }
  213. private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
  214. {
  215. var flowchartContext = context.ReceiverActivityExecutionContext;
  216. var activity = signal.Activity;
  217. var activityExecutionContext = signal.ActivityExecutionContext;
  218. if (activityExecutionContext != null)
  219. {
  220. await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new ScheduleWorkOptions
  221. {
  222. ExistingActivityExecutionContext = activityExecutionContext,
  223. CompletionCallback = OnChildCompletedAsync,
  224. Input = signal.Input
  225. });
  226. }
  227. else
  228. {
  229. await flowchartContext.ScheduleActivityAsync(activity, new ScheduleWorkOptions
  230. {
  231. CompletionCallback = OnChildCompletedAsync,
  232. Input = signal.Input
  233. });
  234. }
  235. }
  236. private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
  237. {
  238. await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
  239. }
  240. }

首先我们从Activity特性中的描述参数中可以看到介绍flowchart作用的一句话:A flowchart is a collection of activities and connections between them.显而易见,flowchart是一个存储了多个Activity和他们连接关系的集合。有了这些数据,flowchart就可以根据connections中的连接关系对activity按照顺序执行了。

Container

接下来我们再往下看,可以看到flowchart不是直接继承Activity的基类,而是继承Container。
Container包含了Activities和Variables两个集合属性,分别用于存储我们的节点集合和变量集合。
在Container的执行入口方法中,先对变量进行了初始化和注册。

  1. protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
  2. {
  3. // Ensure variables have names.
  4. EnsureNames(Variables);
  5. // Register variables.
  6. context.ExpressionExecutionContext.Memory.Declare(Variables);
  7. // Schedule children.
  8. await ScheduleChildrenAsync(context);
  9. }

在最后调用了一个ScheduleChildrenAsync方法。这里可以看到这个方法是一个虚方法,可以给子类重写。

  1. protected virtual ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
  2. {
  3. ScheduleChildren(context);
  4. return ValueTask.CompletedTask;
  5. }

在flowchart中,执行的入口正是这个重写的ScheduleChildrenAsync方法。

Flowchart执行逻辑

回归正题,接下来我们继续看Flowchart的入口,既ScheduleChildrenAsync方法。

  1. protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
  2. {
  3. var startActivity = GetStartActivity(context);
  4. if (startActivity == null)
  5. {
  6. // Nothing else to execute.
  7. await context.CompleteActivityAsync();
  8. return;
  9. }
  10. // Schedule the start activity.
  11. await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
  12. }

先简单过一下这几行的逻辑,首先获取StartActivity,既获取第一个执行的工作流节点,如果获取不到,这结束工作流。
如果获取到了,那么将发起调度,同时传入一个回调函数,这个回调函数是工作流按照顺序执行的关键。

GetStartActivity

那么接下来看它是如何拿到起始节点的呢。

  1. private IActivity? GetStartActivity(ActivityExecutionContext context)
  2. {
  3. // If there's a trigger that triggered this workflow, use that.
  4. var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
  5. var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;
  6. if (triggerActivity != null)
  7. return triggerActivity;
  8. // If an explicit Start activity was provided, use that.
  9. if (Start != null)
  10. return Start;
  11. // If there is a Start activity on the flowchart, use that.
  12. var startActivity = Activities.FirstOrDefault(x => x is Start);
  13. if (startActivity != null)
  14. return startActivity;
  15. // If there's an activity marked as "Can Start Workflow", use that.
  16. var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
  17. if (canStartWorkflowActivity != null)
  18. return canStartWorkflowActivity;
  19. // If there is a single activity that has no inbound connections, use that.
  20. var root = GetRootActivity();
  21. if (root != null)
  22. return root;
  23. // If no start activity found, return the first activity.
  24. return Activities.FirstOrDefault();
  25. }

这里从开头可以看到,优先级最高的StartActivity竟然不是Star,而是先获取TriggerActivity,那么什么是TriggerActivity呢,就比如我们的HTTP Endpoint, Event, Cron这些,当我们拖到画布当中时,默认会勾选Trigger workflow这个选项,如下图中间最下方所示。至于他的触发原理后续再深入探讨,这里就稍微过一下就好了。
image.png
若是没有TriggerActivity,那么flowchart会判断Start属性是否存在,如果存在表示明确指定了Start节点,那这个节点将作为工作流的起始节点。
若是Start也不存在,则会从所有的Activities中查找第一个Start节点,若存在,则作为工作流起始节点。
若在Activities中也没有Start节点,则再判断一下是否有节点勾选了Start Of Workflow选项,若是勾选了,则获取第一个勾选的Activity作为起始节点。
image.png
若是再没有符合条件的节点,则会尝试获取root节点。

  1. private IActivity? GetRootActivity()
  2. {
  3. // Get the first activity that has no inbound connections.
  4. var query =
  5. from activity in Activities
  6. let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
  7. where !inboundConnections
  8. select activity;
  9. var rootActivity = query.FirstOrDefault();
  10. return rootActivity;
  11. }

通过代码我们可以看到,root节点就是Connections连线关系中的第一个节点。
若是一堆节点里面没有任何连线关系,那么最后则会在所有的Activity中取第一个当作入口。

可以看到,获取我们的StartActivity的逻辑还是挺严谨的。

context.ScheduleActivityAsync

好了,获取到了StartActivity之后,接下来就是真正的发起调度了,context.ScheduleActivityAsync方法就是把我们的StartActivity塞进去调度队列,然后会自动执行节点。这执行的逻辑在后面的文章再解析。这个方法关键的是后面那个Callback方法。既OnChildCompletedAsync。

由于OnChildCompletedAsync的逻辑比较复杂,我们放到下一篇文章再继续讲解。

原文链接:https://www.cnblogs.com/fanshaoO/p/18368210

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号