需求分析

实现给定一个文件,统计其中单词出现的数目。

准备工作

新建文件test.txt如下

1
2
3
I am a student
I learn hadoop
I learn MapReduce

上传到hdfs上

hadoop fs -put test.txt /user/wordcount.txt

Spark shell

输入如下命令,即可实现词频统计

1
2
3
4
5
sc.textFile("/user/wordcount.txt")
.flatMap(x=>x.split(" "))
.map(x=>(x, 1))
.reduceByKey(_+_)
.collect()

踩坑

我这里出现了如下提示,并且计算停滞不前

1
2022-04-26 00:36:13,830 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

估计是资源不足,修改spark-default.conf配置文件,增大内存后,正常运行。

Pyspark

通过pyspark shell

输入pyspark启动python的shell

1
2
3
4
5
sc.textFile("/user/wordcount.txt")\
.flatMap(lambda x: x.split(" "))\
.map(lambda x:(x, 1))\
.reduceByKey(lambda x,y: x+y)\
.collect()

可以看出几乎与原生的基于scala的shell没多大区别

运行结果如下

踩坑

需要预先安装好所有节点的python3

1
2
3
4
yum install python3 -y
ssh node1 yum install python3 -y
ssh node2 yum install python3 -y
ssh node3 yum install python3 -y

通过py文件

编写my.py文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.master("local[*]")\
.getOrCreate()

lines = spark.read.text("/user/test.txt")\
.rdd.map(lambda r:r[0])

counts = lines\
.flatMap(lambda x: x.split(" "))\
.map(lambda x:(x, 1))\
.reduceByKey(lambda x,y: x+y)

output = counts.collect()

for (word, count) in output:
print("%s: %i" % (word, count))

spark.stop()

使用spark-submit my.py提交任务

输出结果如下