需求分析

使用MapReduce完成一个词频统计Word Count程序,

如下图,新建一个文本,保存在本机上的input.txt中。

1
2
a a b b b c c c c d d
e f f

统计input.txt中的各单词出现的数目。

环境配置

进入Maven仓库搜索hadoop, 下载hadoop client

Maven Repository: org.apache.hadoop » hadoop-client (mvnrepository.com)

选择自己的hadoop版本,

将Maven的依赖定义复制下来。

打开idea新建一个maven项目

编辑pom.xml将复制的依赖定义粘贴进去

同步依赖完毕后,红色消失,

代码编写

MyMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final Text word = new Text();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
for (StringTokenizer itr = new StringTokenizer(value.toString());
itr.hasMoreTokens(); ) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

MyReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private final IntWritable result = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val: values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class Main {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("Usage: wordcount <in> <out>");
System.exit(2);
}

Job job = Job.getInstance(conf, "word count");
job.setJarByClass(Main.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path input = new Path(otherArgs[0]);
Path output = new Path(otherArgs[1]);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);

boolean successful = job.waitForCompletion(true);
System.exit(successful ? 0 : 1);
}
}

打包运行

打包jar文件

在IDEA中,点击右侧Maven, 运行package任务,即可完成jar的打包,打包结果存在target中。

上传jar文件

将打包结果上传到master节点的服务器上,

上传input.txt

使用put命令上传input.txt文件到hdfs中, 通过ls确定上传成功

1
2
3
4
5
[root@master ~]# hadoop fs -put input.txt /user/input.txt
[root@master ~]# hadoop fs -ls /user
Found 1 items
-rw-r--r-- 3 root supergroup 29 2022-04-19 17:26 /user/input.txt
[root@master ~]#

运行MapReduce程序

使用如下命令即可运行程序,输入的文件为hdfs下的/user/input.txt,结果输出到/user/result文件夹下

yarn jar hadoop-demo-1.0-SNAPSHOT.jar Main /user/input.txt /user/result

查看运行结果

如下图,运行成功

运行结果用如下命令查看,

hadoop fs -cat /user/result/part-r-00000