课程表

Apache Storm课程

工具箱
速查手册

Storm Trident

当前位置:免费教程 » 大数据/云 » Apache Storm

Trident(三叉戟,希腊神话中为海神波塞冬的武器)Storm的延伸。像Storm,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm上提供高级抽象,以及状态流处理和低延迟分布式查询。

Trident使用spoutbolt,但是这些低级组件在执行之前由Trident自动生成。 Trident具有函数,过滤器,联接,分组和聚合。

Trident将流处理为一系列批次,称为事务。通常,这些小批量的大小将是大约数千或数百万个元组,这取决于输入流。这样,Trident不同于Storm,它执行元组一元组处理。

批处理概念非常类似于数据库事务。每个事务都分配了一个事务ID。该事务被认为是成功的,一旦其所有的处理完成。然而,处理事务的元组中的一个的失败将导致整个事务被重传。对于每个批次,Trident将在事务开始时调用beginCommit,并在结束时提交。

Trident拓扑

Trident API公开了一个简单的选项,使用“TridentTopology”类创建Trident拓扑。基本上,Trident拓扑从流出接收输入流,并对流上执行有序的操作序列(滤波,聚合,分组等)。Storm元组被替换为Trident元组,bolt被操作替换。一个简单的Trident拓扑可以创建如下 -

  1. TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples是一个命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可由Trident拓扑处理的数据的基本单位。

Trident Spout

Trident spout与类似于Storm spout,附加选项使用Trident的功能。实际上,我们仍然可以使用IRichSpout,我们在Storm拓扑中使用它,但它本质上是非事务性的,我们将无法使用Trident提供的优点。

具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事务和不透明的事务语义。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。

除了这些通用spouts,Trident有许多样品实施trident spout其中之一是FeederBatchSpout输出,我们可以使用它发送trident tuples的命名列表,而不必担心批处理,并行性等。

FeederBatchSpout创建和数据馈送可以如下所示完成 -

  1. TridentTopology topology = new TridentTopology();
  2. FeederBatchSpout testSpout = new FeederBatchSpout(
  3. ImmutableList.of("fromMobileNumber", "toMobileNumber", duration”));
  4. topology.newStream("fixed-batch-spout", testSpout)
  5. testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident操作

Trident依靠“Trident操作”来处理trident tuples的输入流。Trident API具有多个内置操作来处理简单到复杂的流处理。这些操作的范围从简单验证到复杂的trident tuples分组和聚合。让我们经历最重要和经常使用的操作。

过滤

过滤器是用于执行输入验证任务的对象。Trident过滤器获取trident tuples字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回true,则该元组保存在输出流中;否则,从流中移除元组。过滤器将基本上继承自BaseFilter类并实现isKeep方法。这里是一个滤波器操作的示例实现 -

  1. public class MyFilter extends BaseFilter {
  2. public boolean isKeep(TridentTuple tuple) {
  3. return tuple.getInteger(1) % 2 == 0;
  4. }
  5. }
  6.  
  7. input
  8.  
  9. [1, 2]
  10. [1, 3]
  11. [1, 4]
  12.  
  13. output
  14.  
  15. [1, 2]
  16. [1, 4]

可以使用“each”方法在拓扑中调用过滤器功能。“Fields”类可以用于指定输入(trident tuple的子集)。示例代码如下 -

  1. TridentTopology topology = new TridentTopology();
  2. topology.newStream("spout", spout)
  3. .each(new Fields("a", "b"), new MyFilter())

函数

函数是用于对单个trident tuple执行简单操作的对象。它需要一个trident tuple字段的子集,并发出零个或多个新的trident tuple字段。

函数基本上从BaseFunction类继承并实现execute方法。下面给出了一个示例实现:

  1. public class MyFunction extends BaseFunction {
  2. public void execute(TridentTuple tuple, TridentCollector collector) {
  3. int a = tuple.getInteger(0);
  4. int b = tuple.getInteger(1);
  5. collector.emit(new Values(a + b));
  6. }
  7. }
  8.  
  9. input
  10.  
  11. [1, 2]
  12. [1, 3]
  13. [1, 4]
  14.  
  15. output
  16.  
  17. [1, 2, 3]
  18. [1, 3, 4]
  19. [1, 4, 5]

与过滤操作类似,可以使用每个方法在拓扑中调用函数操作。示例代码如下 -

  1. TridentTopology topology = new TridentTopology();
  2. topology.newStream("spout", spout)
  3. .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下 -

  • aggregate -单独聚合每批trident tuple。在聚合过程期间,首先使用全局分组将元组重新分区,以将同一批次的所有分区组合到单个分区中。

  • partitionAggregate -聚合每个分区,而不是整个trident tuple。分区集合的输出完全替换输入元组。分区集合的输出包含单个字段元组。

  • persistentaggregate -聚合所有批次中的所有trident tuple,并将结果存储在内存或数据库中。

  1. TridentTopology topology = new TridentTopology();
  2.  
  3. // aggregate operation
  4. topology.newStream("spout", spout)
  5. .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  6. .aggregate(new Count(), new Fields(“count”))
  7. // partitionAggregate operation
  8. topology.newStream("spout", spout)
  9. .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  10. .partitionAggregate(new Count(), new Fields(“count"))
  11. // persistentAggregate - saving the count to memory
  12. topology.newStream("spout", spout)
  13. .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  14. .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“计数”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,实现如下 -

  1. public class Count implements CombinerAggregator<Long> {
  2. @Override
  3. public Long init(TridentTuple tuple) {
  4. return 1L;
  5. }
  6. @Override
  7. public Long combine(Long val1, Long val2) {
  8. return val1 + val2;
  9. }
  10. @Override
  11. public Long zero() {
  12. return 0L;
  13. }
  14. }

分组

分组操作是一个内置操作,可以由groupBy方法调用。groupBy方法通过在指定字段上执行partitionBy来重新分区流,然后在每个分区中,它将组字段相等的元组组合在一起。通常,我们使用“groupBy”以及“persistentAggregate”来获得分组聚合。示例代码如下 -

  1. TridentTopology topology = new TridentTopology();
  2.  
  3. // persistentAggregate - saving the count to memory
  4. topology.newStream("spout", spout)
  5. .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  6. .groupBy(new Fields(“d”)
  7. .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和连接

合并和连接可以分别通过使用“合并”和“连接”方法来完成。合并组合一个或多个流。加入类似于合并,除了加入使用来自两边的trident tuple字段来检查和连接两个流的事实。此外,加入将只在批量级别工作。示例代码如下 -

  1. TridentTopology topology = new TridentTopology();
  2. topology.merge(stream1, stream2, stream3);
  3. topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
  4. new Fields("key", "a", "b", "c"));

状态维护

Trident提供了状态维护的机制。状态信息可以存储在拓扑本身中,否则也可以将其存储在单独的数据库中。原因是维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态是否已在之前更新过。如果在更新状态之前元组已经失败,则重试该元组将使状态稳定。然而,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅处理一次 -

  • 小批量处理元组。

  • 为每个批次分配唯一的ID。如果重试批次,则给予相同的唯一ID。

  • 状态更新在批次之间排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成为止。

分布式RPC

分布式RPC用于查询和检索Trident拓扑结果。 Storm有一个内置的分布式RPC服务器。分布式RPC服务器从客户端接收RPC请求并将其传递到拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。Trident的分布式RPC查询像正常的RPC查询一样执行,除了这些查询并行运行的事实。

什么时候使用Trident?

在许多使用情况下,如果要求是只处理一次查询,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下将难以实现精确的一次处理。因此,Trident将对那些需要一次处理的用例有用。Trident不适用于所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理状态。

Trident的工作实例

我们将把上一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用程序将比普通风暴更容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。

格式化呼叫信息

FormatCall类的目的是格式化包括“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下 -

编码:FormatCall.java

  1. import backtype.storm.tuple.Values;
  2.  
  3. import storm.trident.operation.BaseFunction;
  4. import storm.trident.operation.TridentCollector;
  5. import storm.trident.tuple.TridentTuple;
  6.  
  7. public class FormatCall extends BaseFunction {
  8. @Override
  9. public void execute(TridentTuple tuple, TridentCollector collector) {
  10. String fromMobileNumber = tuple.getString(0);
  11. String toMobileNumber = tuple.getString(1);
  12. collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
  13. }
  14. }

CSVSplit

CSVSplit类的目的是基于“comma(,)”拆分输入字符串,并发出字符串中的每个字。此函数用于解析分布式查询的输入参数。完整的代码如下 -

编码:CSVSplit.java

  1. import backtype.storm.tuple.Values;
  2.  
  3. import storm.trident.operation.BaseFunction;
  4. import storm.trident.operation.TridentCollector;
  5. import storm.trident.tuple.TridentTuple;
  6.  
  7. public class CSVSplit extends BaseFunction {
  8. @Override
  9. public void execute(TridentTuple tuple, TridentCollector collector) {
  10. for(String word: tuple.getString(0).split(",")) {
  11. if(word.length() > 0) {
  12. collector.emit(new Values(word));
  13. }
  14. }
  15. }
  16. }

日志分析器

这是主要的应用程序。最初,应用程序将使用FeederBatchSpout初始化TridentTopology并提供调用者信息。Trident拓扑流可以使用TridentTopology类的newStream方法创建。类似地,Trident拓扑DRPC流可以使用TridentTopology类的newDRCPStream方法创建。可以使用LocalDRPC类创建一个简单的DRCP服务器LocalDRPC有execute方法来搜索一些关键字。完整的代码如下。

编码:LogAnalyserTrident.java

  1. import java.util.*;
  2.  
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.LocalDRPC;
  6. import backtype.storm.utils.DRPCClient;
  7. import backtype.storm.tuple.Fields;
  8. import backtype.storm.tuple.Values;
  9.  
  10. import storm.trident.TridentState;
  11. import storm.trident.TridentTopology;
  12. import storm.trident.tuple.TridentTuple;
  13.  
  14. import storm.trident.operation.builtin.FilterNull;
  15. import storm.trident.operation.builtin.Count;
  16. import storm.trident.operation.builtin.Sum;
  17. import storm.trident.operation.builtin.MapGet;
  18. import storm.trident.operation.builtin.Debug;
  19. import storm.trident.operation.BaseFilter;
  20.  
  21. import storm.trident.testing.FixedBatchSpout;
  22. import storm.trident.testing.FeederBatchSpout;
  23. import storm.trident.testing.Split;
  24. import storm.trident.testing.MemoryMapState;
  25.  
  26. import com.google.common.collect.ImmutableList;
  27.  
  28. public class LogAnalyserTrident {
  29. public static void main(String[] args) throws Exception {
  30. System.out.println("Log Analyser Trident");
  31. TridentTopology topology = new TridentTopology();
  32. FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
  33. "toMobileNumber", "duration"));
  34.  
  35. TridentState callCounts = topology
  36. .newStream("fixed-batch-spout", testSpout)
  37. .each(new Fields("fromMobileNumber", "toMobileNumber"),
  38. new FormatCall(), new Fields("call"))
  39. .groupBy(new Fields("call"))
  40. .persistentAggregate(new MemoryMapState.Factory(), new Count(),
  41. new Fields("count"));
  42.  
  43. LocalDRPC drpc = new LocalDRPC();
  44.  
  45. topology.newDRPCStream("call_count", drpc)
  46. .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
  47.  
  48. topology.newDRPCStream("multiple_call_count", drpc)
  49. .each(new Fields("args"), new CSVSplit(), new Fields("call"))
  50. .groupBy(new Fields("call"))
  51. .stateQuery(callCounts, new Fields("call"), new MapGet(),
  52. new Fields("count"))
  53. .each(new Fields("call", "count"), new Debug())
  54. .each(new Fields("count"), new FilterNull())
  55. .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
  56.  
  57. Config conf = new Config();
  58. LocalCluster cluster = new LocalCluster();
  59. cluster.submitTopology("trident", conf, topology.build());
  60. Random randomGenerator = new Random();
  61. int idx = 0;
  62. while(idx < 10) {
  63. testSpout.feed(ImmutableList.of(new Values("1234123401",
  64. "1234123402", randomGenerator.nextInt(60))));
  65.  
  66. testSpout.feed(ImmutableList.of(new Values("1234123401",
  67. "1234123403", randomGenerator.nextInt(60))));
  68.  
  69. testSpout.feed(ImmutableList.of(new Values("1234123401",
  70. "1234123404", randomGenerator.nextInt(60))));
  71.  
  72. testSpout.feed(ImmutableList.of(new Values("1234123402",
  73. "1234123403", randomGenerator.nextInt(60))));
  74.  
  75. idx = idx + 1;
  76. }
  77.  
  78. System.out.println("DRPC : Query starts");
  79. System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
  80. System.out.println(drpc.execute("multiple_call_count", "1234123401 -
  81. 1234123402,1234123401 - 1234123403"));
  82. System.out.println("DRPC : Query ends");
  83.  
  84. cluster.shutdown();
  85. drpc.shutdown();
  86.  
  87. // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
  88. }
  89. }

构建和运行应用程序

完整的应用程序有三个Java代码。他们如下 -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令构建应用程序 -

  1. javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序 -

  1. java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

一旦应用程序启动,应用程序将输出有关集群启动过程,操作处理,DRPC服务器和客户端信息的完整详细信息,以及最后的集群关闭过程。此输出将显示在控制台上,如下所示。

  1. DRPC : Query starts
  2. [["1234123401 - 1234123402",10]]
  3. DEBUG: [1234123401 - 1234123402, 10]
  4. DEBUG: [1234123401 - 1234123403, 10]
  5. [[20]]
  6. DRPC : Query ends
转载本站内容时,请务必注明来自W3xue,违者必究。
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号