课程表

Apache Storm课程

工具箱
速查手册

Storm工作实例

当前位置:免费教程 » 大数据/云 » Apache Storm

我们已经经历了Apache Storm的核心技术细节,现在是时候编写一些简单的场景。

场景 - 移动呼叫日志分析器

移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。

Spout创建

Spout是用于数据生成的组件。基本上,一个spout将实现一个IRichSpout接口。 “IRichSpout”接口有以下重要方法 -

  • open -Spout提供执行环境。执行器将运行此方法来初始化喷头。

  • nextTuple -通过收集器发出生成的数据。

  • close -当spout将要关闭时调用此方法。

  • declareOutputFields -声明元组的输出模式。

  • ack -确认处理了特定元组。

  • fail -指定不处理和不重新处理特定元组。

open

open方法的签名如下 -

  1. open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 为此spout提供storm配置。

  • context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。

  • collector - 使我们能够发出将由bolts处理的元组。

nextTuple

nextTuple方法的签名如下 -

  1. nextTuple()

nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。

close

close方法的签名如下-

  1. close()

declareOutputFields

declareOutputFields方法的签名如下-

  1. declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用于声明输出流id,输出字段等

此方法用于指定元组的输出模式。

ack

ack方法的签名如下 -

  1. ack(Object msgId)

该方法确认已经处理了特定元组。

fail

nextTuple方法的签名如下-

  1. ack(Object msgId)

此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。

  • 主叫号码
  • 接收号码
  • 持续时间

由于我们没有呼叫日志的实时信息,我们将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下。

编码 - FakeCallLogReaderSpout.java

  1. import java.util.*;
  2. //import storm tuple packages
  3. import backtype.storm.tuple.Fields;
  4. import backtype.storm.tuple.Values;
  5.  
  6. //import Spout interface packages
  7. import backtype.storm.topology.IRichSpout;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.spout.SpoutOutputCollector;
  10. import backtype.storm.task.TopologyContext;
  11.  
  12. //Create a class FakeLogReaderSpout which implement IRichSpout interface
  13. to access functionalities
  14. public class FakeCallLogReaderSpout implements IRichSpout {
  15. //Create instance for SpoutOutputCollector which passes tuples to bolt.
  16. private SpoutOutputCollector collector;
  17. private boolean completed = false;
  18. //Create instance for TopologyContext which contains topology data.
  19. private TopologyContext context;
  20. //Create instance for Random class.
  21. private Random randomGenerator = new Random();
  22. private Integer idx = 0;
  23.  
  24. @Override
  25. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  26. this.context = context;
  27. this.collector = collector;
  28. }
  29.  
  30. @Override
  31. public void nextTuple() {
  32. if(this.idx <= 1000) {
  33. List<String> mobileNumbers = new ArrayList<String>();
  34. mobileNumbers.add("1234123401");
  35. mobileNumbers.add("1234123402");
  36. mobileNumbers.add("1234123403");
  37. mobileNumbers.add("1234123404");
  38.  
  39. Integer localIdx = 0;
  40. while(localIdx++ < 100 && this.idx++ < 1000) {
  41. String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  42. String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  43. while(fromMobileNumber == toMobileNumber) {
  44. toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  45. }
  46. Integer duration = randomGenerator.nextInt(60);
  47. this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
  48. }
  49. }
  50. }
  51.  
  52. @Override
  53. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  54. declarer.declare(new Fields("from", "to", "duration"));
  55. }
  56.  
  57. //Override all the interface methods
  58. @Override
  59. public void close() {}
  60.  
  61. public boolean isDistributed() {
  62. return false;
  63. }
  64.  
  65. @Override
  66. public void activate() {}
  67.  
  68. @Override
  69. public void deactivate() {}
  70.  
  71. @Override
  72. public void ack(Object msgId) {}
  73.  
  74. @Override
  75. public void fail(Object msgId) {}
  76.  
  77. @Override
  78. public Map<String, Object> getComponentConfiguration() {
  79. return null;
  80. }
  81. }

Bolt创建

Bolt是一个使用元组作为输入,处理元组,并产生新的元组作为输出的组件。Bolts将实现IRichBolt接口。在此程序中,使用两个Bolts
CallLogCreatorBoltCallLogCounterBolt来执行操作。

IRichBolt接口有以下方法 -

  • prepare -bolt提供要执行的环境。执行器将运行此方法来初始化spout

  • execute -处理单个元组的输入

  • cleanup -spout要关闭时调用。

  • declareOutputFields -声明元组的输出模式。

Prepare

prepare方法的签名如下 -

  1. prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -为此bolt提供Storm配置。

  • context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。

  • collector -使我们能够发出处理的元组。

execute

execute方法的签名如下-

  1. execute(Tuple tuple)

这里的元组是要处理的输入元组。

execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。

cleanup

cleanup方法的签名如下 -

  1. cleanup()

declareOutputFields

declareOutputFields方法的签名如下-

  1. declareOutputFields(OutputFieldsDeclarer declarer)

这里的参数declarer用于声明输出流id,输出字段等。

此方法用于指定元组的输出模式

呼叫日志创建者bolt

呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。

编码 - CallLogCreatorBolt.java

  1. //import util packages
  2. import java.util.HashMap;
  3. import java.util.Map;
  4.  
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Values;
  7. import backtype.storm.task.OutputCollector;
  8. import backtype.storm.task.TopologyContext;
  9.  
  10. //import Storm IRichBolt package
  11. import backtype.storm.topology.IRichBolt;
  12. import backtype.storm.topology.OutputFieldsDeclarer;
  13. import backtype.storm.tuple.Tuple;
  14.  
  15. //Create a class CallLogCreatorBolt which implement IRichBolt interface
  16. public class CallLogCreatorBolt implements IRichBolt {
  17. //Create instance for OutputCollector which collects and emits tuples to produce output
  18. private OutputCollector collector;
  19.  
  20. @Override
  21. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  22. this.collector = collector;
  23. }
  24.  
  25. @Override
  26. public void execute(Tuple tuple) {
  27. String from = tuple.getString(0);
  28. String to = tuple.getString(1);
  29. Integer duration = tuple.getInteger(2);
  30. collector.emit(new Values(from + " - " + to, duration));
  31. }
  32.  
  33. @Override
  34. public void cleanup() {}
  35.  
  36. @Override
  37. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  38. declarer.declare(new Fields("call", "duration"));
  39. }
  40. @Override
  41. public Map<String, Object> getComponentConfiguration() {
  42. return null;
  43. }
  44. }

呼叫日志计数器Bolt

呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。

编码 - CallLogCounterBolt.java

  1. import java.util.HashMap;
  2. import java.util.Map;
  3.  
  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;
  6. import backtype.storm.task.OutputCollector;
  7. import backtype.storm.task.TopologyContext;
  8. import backtype.storm.topology.IRichBolt;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.tuple.Tuple;
  11.  
  12. public class CallLogCounterBolt implements IRichBolt {
  13. Map<String, Integer> counterMap;
  14. private OutputCollector collector;
  15.  
  16. @Override
  17. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  18. this.counterMap = new HashMap<String, Integer>();
  19. this.collector = collector;
  20. }
  21.  
  22. @Override
  23. public void execute(Tuple tuple) {
  24. String call = tuple.getString(0);
  25. Integer duration = tuple.getInteger(1);
  26. if(!counterMap.containsKey(call)){
  27. counterMap.put(call, 1);
  28. }else{
  29. Integer c = counterMap.get(call) + 1;
  30. counterMap.put(call, c);
  31. }
  32. collector.ack(tuple);
  33. }
  34.  
  35. @Override
  36. public void cleanup() {
  37. for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
  38. System.out.println(entry.getKey()+" : " + entry.getValue());
  39. }
  40. }
  41.  
  42. @Override
  43. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  44. declarer.declare(new Fields("call"));
  45. }
  46. @Override
  47. public Map<String, Object> getComponentConfiguration() {
  48. return null;
  49. }
  50. }

创建拓扑

Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -

  1. TopologyBuilder builder = new TopologyBuilder();
  2.  
  3. builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
  4.  
  5. builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
  6. .shuffleGrouping("call-log-reader-spout");
  7.  
  8. builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
  9. .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping方法有助于为spoutbolts设置流分组。

本地集群

为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -

编码 - LogAnalyserStorm.java

  1. import backtype.storm.tuple.Fields;
  2. import backtype.storm.tuple.Values;
  3.  
  4. //import storm configuration packages
  5. import backtype.storm.Config;
  6. import backtype.storm.LocalCluster;
  7. import backtype.storm.topology.TopologyBuilder;
  8.  
  9. //Create main class LogAnalyserStorm submit topology.
  10. public class LogAnalyserStorm {
  11. public static void main(String[] args) throws Exception{
  12. //Create Config instance for cluster configuration
  13. Config config = new Config();
  14. config.setDebug(true);
  15. //
  16. TopologyBuilder builder = new TopologyBuilder();
  17. builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
  18.  
  19. builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
  20. .shuffleGrouping("call-log-reader-spout");
  21.  
  22. builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
  23. .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
  24. LocalCluster cluster = new LocalCluster();
  25. cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
  26. Thread.sleep(10000);
  27. //Stop the topology
  28. cluster.shutdown();
  29. }
  30. }

构建和运行应用程序

完整的应用程序有四个Java代码。它们是 -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

应用程序可以使用以下命令构建 -

  1. javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

应用程序可以使用以下命令运行 -

  1. java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

输出

一旦应用程序启动,它将输出有关集群启动过程,spout和螺栓处理的完整详细信息,最后是集群关闭过程。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上如下 -

  1. 1234123402 - 1234123401 : 78
  2. 1234123402 - 1234123404 : 88
  3. 1234123402 - 1234123403 : 105
  4. 1234123401 - 1234123404 : 74
  5. 1234123401 - 1234123403 : 81
  6. 1234123401 - 1234123402 : 81
  7. 1234123403 - 1234123404 : 86
  8. 1234123404 - 1234123401 : 63
  9. 1234123404 - 1234123402 : 82
  10. 1234123403 - 1234123402 : 83
  11. 1234123404 - 1234123403 : 86
  12. 1234123403 - 1234123401 : 93

非JVM语言

Storm拓扑通过Thrift接口实现,这使得轻松地提交任何语言的拓扑。Storm支持Ruby,Python和许多其他语言。让我们来看看python绑定。

Python绑定

Python是一种通用的解释,交互,面向对象和高级编程语言。Storm支持Python实现其拓扑。Python支持发射,锚定,acking和日志操作。

如你所知,bolt可以用任何语言定义。用另一种语言编写的bolt作为子进程执行,Storm通过stdin / stdout与JSON消息进行通信。首先拿一个支持python绑定的样例bolt WordCount。

  1. public static class WordCount implements IRichBolt {
  2. public WordSplit() {
  3. super("python", "splitword.py");
  4. }
  5. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  6. declarer.declare(new Fields("word"));
  7. }
  8. }

这里的类WordCount实现IRichBolt接口和运行与python实现指定超级方法参数“splitword.py”。现在创建一个名为“splitword.py”的python实现。

  1. import storm
  2. class WordCountBolt(storm.BasicBolt):
  3. def process(self, tup):
  4. words = tup.values[0].split(" ")
  5. for word in words:
  6. storm.emit([word])
  7. WordCountBolt().run()

这是Python的示例实现,它计算给定句子中的单词。同样,您也可以与其他支持语言绑定。

转载本站内容时,请务必注明来自W3xue,违者必究。
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号