课程表

Apache Storm课程

工具箱
速查手册

Storm在Twitter上的应用

当前位置:免费教程 » 大数据/云 » Apache Storm

在本章中,我们将讨论Apache Storm的实时应用程序。我们将看到Storm如何在Twitter中使用。

Twitter

Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。注册用户可以阅读和发布tweet,但未注册的用户只能阅读tweets。 Hashtag用于按关键字在相关关键字之前附加#来对tweet进行分类。现在让我们来看一个实时场景,找到每个主题使用最多的hashtag。

Spout创建

spout的目的是尽快收到人们提交的tweets。Twitter提供了“Twitter Streaming API”,一个基于Web服务的工具,用于实时检索人们提交的tweets。Twitter Streaming API可以使用任何编程语言访问。

twitter4j是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter Streaming API。twitter4j提供了一个基于监听器的框架来访问tweet。要访问Twitter Streaming API,我们需要登录Twitter开发人员帐户,并获取以下OAuth身份验证详细信息。

  • Customerkey
  • CustomerSecret
  • 的accessToken
  • AccessTookenSecret

Storm在其入门套件中提供了一个twitter spout,TwitterSampleSpout。我们将使用它来检索tweet。该邮件需要OAuth身份验证详细信息和至少一个关键字。该spout将发出基于关键字的实时tweet。完整的程序代码如下。

编码:TwitterSampleSpout.java

  1. import java.util.Map;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3.  
  4. import twitter4j.FilterQuery;
  5. import twitter4j.StallWarning;
  6. import twitter4j.Status;
  7. import twitter4j.StatusDeletionNotice;
  8. import twitter4j.StatusListener;
  9.  
  10. import twitter4j.TwitterStream;
  11. import twitter4j.TwitterStreamFactory;
  12. import twitter4j.auth.AccessToken;
  13. import twitter4j.conf.ConfigurationBuilder;
  14.  
  15. import backtype.storm.Config;
  16. import backtype.storm.spout.SpoutOutputCollector;
  17.  
  18. import backtype.storm.task.TopologyContext;
  19. import backtype.storm.topology.OutputFieldsDeclarer;
  20. import backtype.storm.topology.base.BaseRichSpout;
  21. import backtype.storm.tuple.Fields;
  22. import backtype.storm.tuple.Values;
  23.  
  24. import backtype.storm.utils.Utils;
  25.  
  26. @SuppressWarnings("serial")
  27. public class TwitterSampleSpout extends BaseRichSpout {
  28. SpoutOutputCollector _collector;
  29. LinkedBlockingQueue<Status> queue = null;
  30. TwitterStream _twitterStream;
  31. String consumerKey;
  32. String consumerSecret;
  33. String accessToken;
  34. String accessTokenSecret;
  35. String[] keyWords;
  36. public TwitterSampleSpout(String consumerKey, String consumerSecret,
  37. String accessToken, String accessTokenSecret, String[] keyWords) {
  38. this.consumerKey = consumerKey;
  39. this.consumerSecret = consumerSecret;
  40. this.accessToken = accessToken;
  41. this.accessTokenSecret = accessTokenSecret;
  42. this.keyWords = keyWords;
  43. }
  44. public TwitterSampleSpout() {
  45. // TODO Auto-generated constructor stub
  46. }
  47. @Override
  48. public void open(Map conf, TopologyContext context,
  49. SpoutOutputCollector collector) {
  50. queue = new LinkedBlockingQueue<Status>(1000);
  51. _collector = collector;
  52. StatusListener listener = new StatusListener() {
  53. @Override
  54. public void onStatus(Status status) {
  55. queue.offer(status);
  56. }
  57. @Override
  58. public void onDeletionNotice(StatusDeletionNotice sdn) {}
  59. @Override
  60. public void onTrackLimitationNotice(int i) {}
  61. @Override
  62. public void onScrubGeo(long l, long l1) {}
  63. @Override
  64. public void onException(Exception ex) {}
  65. @Override
  66. public void onStallWarning(StallWarning arg0) {
  67. // TODO Auto-generated method stub
  68. }
  69. };
  70. ConfigurationBuilder cb = new ConfigurationBuilder();
  71. cb.setDebugEnabled(true)
  72. .setOAuthConsumerKey(consumerKey)
  73. .setOAuthConsumerSecret(consumerSecret)
  74. .setOAuthAccessToken(accessToken)
  75. .setOAuthAccessTokenSecret(accessTokenSecret);
  76. _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
  77. _twitterStream.addListener(listener);
  78. if (keyWords.length == 0) {
  79. _twitterStream.sample();
  80. }else {
  81. FilterQuery query = new FilterQuery().track(keyWords);
  82. _twitterStream.filter(query);
  83. }
  84. }
  85. @Override
  86. public void nextTuple() {
  87. Status ret = queue.poll();
  88. if (ret == null) {
  89. Utils.sleep(50);
  90. } else {
  91. _collector.emit(new Values(ret));
  92. }
  93. }
  94. @Override
  95. public void close() {
  96. _twitterStream.shutdown();
  97. }
  98. @Override
  99. public Map<String, Object> getComponentConfiguration() {
  100. Config ret = new Config();
  101. ret.setMaxTaskParallelism(1);
  102. return ret;
  103. }
  104. @Override
  105. public void ack(Object id) {}
  106. @Override
  107. public void fail(Object id) {}
  108. @Override
  109. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  110. declarer.declare(new Fields("tweet"));
  111. }
  112. }

Hashtag阅读器spout

由spout发出的tweet将被转发到HashtagReaderBolt,它将处理该tweet并发出所有可用的hashtag。HashtagReaderBolt使用twitter4j提供的getHashTagEntities方法。getHashTagEntities读取tweet并返回hashtag的列表。完整的程序代码如下 -

编码:HashtagReaderBolt.java

  1. import java.util.HashMap;
  2. import java.util.Map;
  3.  
  4. import twitter4j.*;
  5. import twitter4j.conf.*;
  6.  
  7. import backtype.storm.tuple.Fields;
  8. import backtype.storm.tuple.Values;
  9.  
  10. import backtype.storm.task.OutputCollector;
  11. import backtype.storm.task.TopologyContext;
  12. import backtype.storm.topology.IRichBolt;
  13. import backtype.storm.topology.OutputFieldsDeclarer;
  14. import backtype.storm.tuple.Tuple;
  15.  
  16. public class HashtagReaderBolt implements IRichBolt {
  17. private OutputCollector collector;
  18.  
  19. @Override
  20. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  21. this.collector = collector;
  22. }
  23.  
  24. @Override
  25. public void execute(Tuple tuple) {
  26. Status tweet = (Status) tuple.getValueByField("tweet");
  27. for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
  28. System.out.println("Hashtag: " + hashtage.getText());
  29. this.collector.emit(new Values(hashtage.getText()));
  30. }
  31. }
  32.  
  33. @Override
  34. public void cleanup() {}
  35.  
  36. @Override
  37. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  38. declarer.declare(new Fields("hashtag"));
  39. }
  40. @Override
  41. public Map<String, Object> getComponentConfiguration() {
  42. return null;
  43. }
  44. }

Hashtag计数器spout

发出的hashtag将被转发到HashtagCounterBolt。这个bolt将处理所有的hashtags,并使用Java Map对象将每个hashtags及其计数保存在内存中。完整的程序代码如下。

编码:HashtagCounterBolt.java

  1. import java.util.HashMap;
  2. import java.util.Map;
  3.  
  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;
  6.  
  7. import backtype.storm.task.OutputCollector;
  8. import backtype.storm.task.TopologyContext;
  9.  
  10. import backtype.storm.topology.IRichBolt;
  11. import backtype.storm.topology.OutputFieldsDeclarer;
  12. import backtype.storm.tuple.Tuple;
  13.  
  14. public class HashtagCounterBolt implements IRichBolt {
  15. Map<String, Integer> counterMap;
  16. private OutputCollector collector;
  17.  
  18. @Override
  19. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  20. this.counterMap = new HashMap<String, Integer>();
  21. this.collector = collector;
  22. }
  23.  
  24. @Override
  25. public void execute(Tuple tuple) {
  26. String key = tuple.getString(0);
  27.  
  28. if(!counterMap.containsKey(key)){
  29. counterMap.put(key, 1);
  30. }else{
  31. Integer c = counterMap.get(key) + 1;
  32. counterMap.put(key, c);
  33. }
  34. collector.ack(tuple);
  35. }
  36.  
  37. @Override
  38. public void cleanup() {
  39. for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
  40. System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
  41. }
  42. }
  43.  
  44. @Override
  45. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  46. declarer.declare(new Fields("hashtag"));
  47. }
  48. @Override
  49. public Map<String, Object> getComponentConfiguration() {
  50. return null;
  51. }
  52. }

提交拓扑

提交拓扑是主要应用程序。Twitter拓扑由TwitterSampleSpout,HashtagReaderBolt和HashtagCounterBolt组成。以下程序代码显示如何提交拓扑。

编码:TwitterHashtagStorm.java

  1. import java.util.*;
  2.  
  3. import backtype.storm.tuple.Fields;
  4. import backtype.storm.tuple.Values;
  5. import backtype.storm.Config;
  6. import backtype.storm.LocalCluster;
  7. import backtype.storm.topology.TopologyBuilder;
  8.  
  9. public class TwitterHashtagStorm {
  10. public static void main(String[] args) throws Exception{
  11. String consumerKey = args[0];
  12. String consumerSecret = args[1];
  13. String accessToken = args[2];
  14. String accessTokenSecret = args[3];
  15. String[] arguments = args.clone();
  16. String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
  17. Config config = new Config();
  18. config.setDebug(true);
  19. TopologyBuilder builder = new TopologyBuilder();
  20. builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
  21. consumerSecret, accessToken, accessTokenSecret, keyWords));
  22.  
  23. builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
  24. .shuffleGrouping("twitter-spout");
  25.  
  26. builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
  27. .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
  28. LocalCluster cluster = new LocalCluster();
  29. cluster.submitTopology("TwitterHashtagStorm", config,
  30. builder.createTopology());
  31. Thread.sleep(10000);
  32. cluster.shutdown();
  33. }
  34. }

构建和运行应用程序

完整的应用程序有四个Java代码。他们如下 -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

您可以使用以下命令编译应用程序 -

  1. javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

使用以下命令执行应用程序 -

  1. javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
  2. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
  3. <keyword1> <keyword2> … <keywordN>

输出

应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容 -

  1. Result: jazztastic : 1
  2. Result: foodie : 1
  3. Result: Redskins : 1
  4. Result: Recipe : 1
  5. Result: cook : 1
  6. Result: android : 1
  7. Result: food : 2
  8. Result: NoToxicHorseMeat : 1
  9. Result: Purrs4Peace : 1
  10. Result: livemusic : 1
  11. Result: VIPremium : 1
  12. Result: Frome : 1
  13. Result: SundayRoast : 1
  14. Result: Millennials : 1
  15. Result: HealthWithKier : 1
  16. Result: LPs30DaysofGratitude : 1
  17. Result: cooking : 1
  18. Result: gameinsight : 1
  19. Result: Countryfile : 1
  20. Result: androidgames : 1
转载本站内容时,请务必注明来自W3xue,违者必究。
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号