先改pom.xml:
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.mcq</groupId>
- <artifactId>mr-1101</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- <version>1.8</version>
- <scope>system</scope>
- <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.2</version>
- </dependency>
- </dependencies>
- </project>
在resources文件夹下添加文件 log4j.properties:
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
WordcountDriver.java:
- package com.mcq;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class WordcountDriver{
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- System.out.println("hello");
- Configuration conf=new Configuration();
- //1.获取Job对象
- Job job=Job.getInstance(conf);
- //2.设置jar存储位置
- job.setJarByClass(WordcountDriver.class);
- //3.关联Map和Reduce类
- job.setMapperClass(WordcountMapper.class);
- job.setReducerClass(WordcountReducer.class);
- //4.设置Mapper阶段输出数据的key和value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- //5.设置最终输出的key和value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- //6.设置输入路径和输出路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- //7.提交Job
- // job.submit();
- job.waitForCompletion(true);
- // boolean res=job.waitForCompletion(true);//true表示打印结果
- // System.exit(res?0:1);
- }
- }
WordcountMapper.java:
- package com.mcq;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- //map阶段
- //KEYIN:输入数据的key(偏移量,比如第一行是0~19,第二行是20~25),必须是LongWritable
- //VALUEIN:输入数据的value(比如文本内容是字符串,那就填Text)
- //KEYOUT:输出数据的key类型
- //VALUEOUT:输出数据的值类型
- public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
- IntWritable v=new IntWritable(1);
- Text k = new Text();
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- //1.获取一行
- String line=value.toString();
- //2.切割单词
- String[] words=line.split(" ");
- //3.循环写出
- for(String word:words) {
- k.set(word);
- context.write(k, v);
- }
- }
- }
WordcountReducer.java:
- package com.mcq;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- //KEYIN、VALUEIN:map阶段输出的key和value类型
- public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
- IntWritable v=new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,
- Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- int sum=0;
- for(IntWritable value:values) {
- sum+=value.get();
- }
- v.set(sum);
- context.write(key, v);
- }
- }
在run configuration里加上参数e:/mrtest/in.txt e:/mrtest/out.txt 
运行时遇到了个bug,参考https://blog.csdn.net/qq_40310148/article/details/86617512解决了
在集群上运行:
用maven打成jar包,需要添加一些打包依赖:
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin </artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <mainClass>com.mcq.WordcountDriver</mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
注意上面mainClass里要填驱动类的主类名,可以点击类名右键copy qualified name。
将程序打成jar包(具体操作:右键工程名run as maven install,然后target文件夹会产生两个jar包,我们把不用依赖的包拷贝到hadoop集群上,因为集群已经配好相关依赖了),上传到集群
输入以下命令运行
hadoop jar mr-1101-0.0.1-SNAPSHOT.jar com.mcq.WordcountDriver /xiaocao.txt /output
注意这里输入输出的路径是集群上的路径。