题目要求
输入
- 莎士比亚文集,存放在HDFS上的一个文件目录下面
- 停词表:与scala目录同级的一个resources目录下
输出
- 所有文集中空行的总数
- 出现次数前100的单词
- 注意:不能上传停词表到HDFS上
数据预处理
源文件是一个一个100KB左右的小文件,经过批量复制一共9000多个,如果在处理时依次读取,会花费大量不必要的时间,因此先对其进行处理,合并成一个大文件,放在hdfs上。
文件合并思路
使用JAVA操作hdfs的API,读取本地指定目录下所有的文件,依次追加到集群上某一个文件中。相当于代替hdfs shell的appendToFile
操作。
合并代码
1 | //HDFSWriter.java |
执行过程
打成JAR之后,用hadoop jar运行1
hadoop jar /home/hadoop/bigdata-hadoop-mr1-1.jar cn.youe.hdfs.HDFSWriter /home/hadoop/shakespeare merged.txt
运行完毕后,集群上出现了1.1G的合并后的文件1
2[root@hdp-node-02 hadoop]# hdfs dfs -ls -h /
-rw-r--r-- 1 root supergroup 1.1 G 2017-06-28 20:52 /merged.txt
至此,数据预处理完毕,对merged.txt进行词频统计
分析
数据按行读入,先统计空行个数,同时将每一行分割成单词的集合。再过滤掉停词,进行词频统计。
源码
1 | import org.apache.spark.{SparkConf, SparkContext} |
在集群运行
1 | /usr/local/apps/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \ |
主要优化思路
- 统计空行数量的变量sum设置为累加器
- stopWords设置为广播变量
- 减少一个map
- stopWords存储结构使用Set而非Array,查询效率更高
不用合并文件的方法
设置hdfs分割文件的大小,使程序可以一次读取大量内容而非按照文件依次去读,这样省去了文件合并的步骤,同时时间也很短。
代码
1 | import org.apache.hadoop.io.{LongWritable, Text} |