MapReduce
MapReduce的基本介绍
MapReduce: 分布式计算框架
分而治之: 生活中: 搬砖 图书馆数书 计算从1~100和 整个分而治之思想主要有二大阶段: 分(map阶段): 将一个任务拆分为多个小的任务 合(reduce阶段): 将每个小的任务结果进行聚合汇总在一起MapReduce既然是一个分布式计算框架, 必然需要有输入 和 输出, 数据在map执行之前进行读取数据, 在reduce之后将数据写出去 数据经历阶段: 1) 数据读取阶段: 不断持续的一直读取数据, 默认一行一行的读取数据, 每读取一行 就需要执行一次map的操作 数据传递 采用 kv方式: 读取过来数据 一般称为 k1和v1 2) map阶段: 接收k1和v1, 对数据进行处理, 形成新的键值对 k2和 v2 3) reduce阶段: 接收k2和v2 进行聚合统计操作, 然后转换为k3和v3 4) 数据输出阶段: 将k3和v3输出到目的地
MapReduce的编程模型
- 整个MapReduce编写步骤, 共计为8步: 天龙八部
map阶段:
- 读取数据: 将读取数据转换为k1和v1
- 自定义map逻辑: 接收k1和v1 , 将k1和v1转换为k2和v2
shuffle阶段:
- 分区: 将相同k2的数据, 发往同一个reduce, 保证相同k2的数据都在一个reduce中
- 排序: 对k2进行排序操作, 默认升序 字典序和数字顺序
- 规约: MR的优化工作, 提前聚合操作 可以省略
- 分组: 将相同k2的对应v2数据合并在一起, 形成一个集合
reduce阶段:
- 自定义reduce逻辑: 接收k2和v2 将其转换为 k3和v3
- 输出数据: 将k3和v3 输出目的地
MapReduce的入门案例
- 需求: 请统计指定单词文件中, 每个单词出现了多少次
- 代码实现
第一步: 自定义 map代码
第二步: 自定义 reduce代码
第三步: 编写MR驱动类 (组装天龙八部)- 创建maven项目. 导入相关的依赖:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
-->
</plugins>
</build>- 第一步: 自定义map逻辑的代码
1) 创建一个类, 继承mapper类
2) 重写mapper类中map的方法
3) 在map方法中 实现map的逻辑package com.mr.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
// map类被初始化的时候, 会执行的操作 只会执行一次
/*@Override
protected void setup(Context context) throws IOException, InterruptedException {
}*/
private IntWritable v2 = new IntWritable(1);
private Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString();
//2. 判断操作
if(line != null && !"".equals(line)){ // 如果能进来, 说明line中有数据的 千万不要丢掉 ! 符号
//3. 对数据执行切割操作
String[] words = line.split(" ");
//4. 遍历数组
for (String word : words) {
k2.set(word);
context.write(k2,v2);
}
}
}
// map类被关闭的时候, 会执行的操作, 只会执行一次 释放资源
/*@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}*/
}- 第二部: 自定义reduce逻辑代码
1) 创建一个类, 继承reducer类
2) 重写reducer类中reduce的方法
3) 在reduce方法中 实现reduce的逻辑package com.mr.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text,IntWritable,Text,LongWritable> {
private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1. 遍历这个迭代器, 获取每一个v2数据
long count = 0;
for (IntWritable value : values) {
int i = value.get();
count += i;
}
//2. 写出去
v3.set(count);
context.write(key,v3);
}
}- 第三部: 实现驱动类
package com.mr.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1. 创建 任务对象: job对象
Job job = Job.getInstance(getConf(), "WordCountDriver");
//2. 封装任务: 封装天龙八部
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\input\\wordcount.txt"));
//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )
//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\output"));
//3. 提交执行了
boolean flag = job.waitForCompletion(true); // 是否等待执行的状态
return flag ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//1. 指定run方法
Configuration conf = new Configuration();
int i = ToolRunner.run(conf, new WordCountDriver(), args); // 返回值为执行状态 0 正常 1异常
//2. 执行程序退出
System.exit(i);
}
}- 简写方式:
package com.mr.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountDriver2 {
public static void main(String[] args) throws Exception {
//1. 创建 任务对象: job对象
Job job = Job.getInstance(new Configuration(), "WordCountDriver");
//2. 封装任务: 封装天龙八部
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\input\\wordcount.txt"));
//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )
//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\output1"));
//3. 提交执行了
boolean flag = job.waitForCompletion(true); // 是否等待执行的状态
//4. 退出程序:
System.exit(flag ? 0 : 1);
}
}- MapReduce的运行方式
本地运行
直接右键run即可, 输入和输出路径可以是本地 也可以是HDFS
如果输入和输出都是本地路径, 那么不需要启动hadoop集群 如果输出和输出是HDFS路径, 需要保证HDFS已经启动了
集群化运行:
将MapReduce在yarn平台上执行
- 在MR的驱动类, 添加集群运行配置代码:
job.setJarByClass(当前类的.class); 注意: 此配置必须配, 否则在yarn上执行的时候, 会报找不到主类- 在yarn上运行, 建议输入和输出路径采用HDFS路径, 不要使用本地路径
TextInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 args[0] 后期执行时候动态传递 TextOutputFormat.setOutputPath(job,new Path(args[1]));// 输出路径 args[1] 后期执行时候动态传递- 在pom文件中 添加打包插件
打包插件作用: 将当前程序所依赖的jar包, 一并打入到当前jar包中 (fatjar)
注意: 如果不加打包插件, 只会将自己编写代码进行打包, 依赖的jar包, 不会打入的
什么时候需要加入这个打包插件?
pom文件中使用的依赖包有非hadoop的jar包的时候
添加打包插件后, 如果有一些包, 不需要打入jar包中: 建议在依赖中添加
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin>- 执行打包:
- 由于在wordCount案例中, 所有的依赖包都是hadoop相关的jar包, 所以此处只需要使用小的jar包即可
- 将这个jar包上传到Linux系统中
- 执行 jar包
数据准备工作 hdfs dfs -mkdir -p /wordcount/input hdfs dfs -put wordcount.txt /wordcount/input 执行操作: yarn jar wordCountMR.jar com.mr.wordCount.WordCountDriver hdfs://node1:8020/wordcount/input/wordcount.txt hdfs://node1:8020/wordcount/output 格式: yarn jar jar包路径 驱动类包名+类名(权限类名) [args...] hadoop jar jar包路径 驱动类包名+类名(权限类名) [args...]
MapReduce的原理
MapReduce的并行机制
- 并行机制: map 或者 reduce 在执行的时候, 会运行多少个map 和多少个reduce问题
MapTask并行度
【在执行MapReduce的时候, mapTask会运行多少个取决于什么呢?】
mapTask数量取决于读取的数据量, 根据数据量不同, MapTask也会启动不同个
默认行为下,
每个mapTask处理数据量为128M , 也就是, 读取一个文件, 这个你文件块有多少个, 就会启动多少个mapTask
读取的每一个文件至少需要一个mapTask来运行
思考: 假设在HDFS中有一个文件为300M大小, 分为三个块(物理划分), 128,128,44 , 此时MapReduce会启动三个mapTask来执行, 请问MapTask在读取这个文件 是否和这几个块一一对应呢? 不会的
在整个MapTask在读取数据采用文件切片, 文件切片大小和块的大小是一致的, 但是文件切片是一个逻辑的划分
将这个文件整体读取过来, 根据文件切片大小对数据进行逻辑划分块, 划分多少块然后就启动多少MapTask
原因: 担心将一行数据划分到不同的mapTask中
切片的公式:
Math.max(minSize, Math.min(maxSize, blockSize));
说明:
minSize: 最小值默认 为 0
maxSize: 最大值默认为 Long的最大值(好大好大值)
blockSize: 块大小 128M HDFS决定
根据公式, 可以算出, 默认每一个文件切片大小128M
比如说: 一个文件 1GB , 此时按照默认的行为, 会启动多少mapTask呢? 8 个mapTask
需要: 由于服务器性能比较好, 每一个MapTask可以处理256M 也没有问题, 此时需要你解决这个问题呢?
通过调整: minSize 将其调整为 256M 大小
如果调大每个文件切片大小, 通过调整minsize方式即可
需求: 由于服务器性能不是特别好, 每一个MapTask可以处理64M , 此时需要你解决这个问题呢?
通过调整: maxSize 将其调整为 64M大小
如果调小每个文件切片大小, 通过调整maxsize方式即可
那么在代码中如何体现呢? 在驱动类中输入format设置即可
TextInputFormat.setMaxInputSplitSize();
TextInputFormat.setMinInputSplitSize();- reduce的并行度
在执行MapReduce的程序的时候, reduce有多少个, 取决于什么? 结果最终需要几个文件有关系, 说白了, 有几个reduce, 也就表示着有几个分区, 如果自定义分区, 那么分区的数量和reduce的相等
如何设置reduce的数量呢? 默认reduce数量为 1
job.setNumReduceTask(N);mapTask的工作机制
reduce的工作机制
进阶使用:自定义shuffle
MapReduce中自定义分区
MR中默认分区方案: 将相同的k2发往同一个reduce操作
MR如何确保这个事情的呢?
解决方案: 采用 hash取模计算法
k2.hashcode % numPartition 余数得多少, 就将k2数据发到那个编号的分区上即可
在MR中专门有一个默认分区类: HashPartitioner
(key.hashCode() & 2147483647) % numReduceTasks
好处:
给相同的数据打上同样的分区的标记
弊端:
划分的数据比较随机- 需求: 将15及以上的结果以及15以下的结果进行分开成两个文件进行保存
- 实现思路流程:
- 代码实现:
- mapper类:
package com.mr.partition;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LotteryMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
private IntWritable k2 = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行数据
String line = value.toString();
//2. 判断操作
if(line!=null && !"".equals(line)){
//3.从一行数据中提取开奖号
String[] fields = line.split("\t");
int lottery = Integer.parseInt(fields[5]);
//4. 写出去
k2.set(lottery);
context.write(k2,value);
}
}
}- 定义自定义分区类
package com.mr.partition;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
// 自定义分区类
public class MyPartitioner extends Partitioner<IntWritable,Text> {
@Override
public int getPartition(IntWritable k2, Text v2, int numReducerTask) {
// 思考: 判断k2的数据是否是大于等于15 如果大于返回 0 否则 返回 1
int lottery = k2.get();
if(lottery>=15){
return 0;
}else {
return 1;
}
}
}- 定义reduce逻辑
package com.mr.partition;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LotteryReducer extends Reducer<IntWritable,Text,Text,NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,NullWritable.get());
}
}
}- 定义驱动类:
- 一定要设置自己定义的分区类, 如果不设置, 默认MR会使用HashPartitioner进行分区操作,因为MR并不知道你设置了自定义分区
- 一定要设置reduce的数量, 因为reduce数量默认为1个, 如果不设置, 即使分区编号有多个, 最终也只能落在一个reduce上
package com.mr.partition;
import com.mr.wordCount.WordCountMapper;
import com.mr.wordCount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LotteryDriver {
public static void main(String[] args) throws Exception{
//1. 创建 job对象: 任务对象
Job job = Job.getInstance(new Configuration(), "LotteryDriver");
// 集群化运行的必备参数
job.setJarByClass(LotteryDriver.class);
//2. 封装job任务: 天龙八部:
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\自定义分区\\input\\partition.csv"));
//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(LotteryMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//2.3: 设置shuffle操作: 分区
job.setPartitionerClass(MyPartitioner.class);
//2.4: 设置shuffle操作: 排序 规约 分组 (默认 )
//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(LotteryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\自定义分区\\output1"));
//2.9:设置reduce的数量 默认为 1
job.setNumReduceTasks(2);
//3. 执行代码
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ? 0 : 1);
}
}- 右键执行测试即可:
MapReduce中自定义排序与序列化
序列化: 将数据转换为字节的过程
反序列化: 将字节内容转回到数据本身
如果数据能够支持被序列化, 也就说数据可以进行保存磁盘或者进行网络的传输工作
如何让一类或者让某个数据类型支持序列化呢?
在java中, 专门有一个序列化的接口: Serializable, 如果你想让一个类或者数据类型实现序列化的操作, 只需要让其这个类实现serializable接口即可
在hadoop中, 并没有使用默认的数据类型, 虽然这些类型已经实现了序列化, 原因如下:
因为java中提供的这种序列化方式有点太重了, java的序列化在对数据进行转换的时候, 除了携带数据本身以外, 还携带这个类的继承体系以及其他的校验数据信息,这样导致传递的内容变得更多了(变的更重了)
这样如果采用java的序列化方案进行网络传输, 就会导致传输大量没有用的数据, 从而占用大量的网络带宽, 导致传递数据的效率下降, 成本提高
hadoop为了解决这样的问题, 专门提供了一套针对于hadoop的序列化方案: writable类
writable在传输的过程中, 仅会对数据本身进行转换 并不会携带额外一些内容, 从而保证传输效率更高- 说明writable:
在writable类中, 有两个方法:
write() : 序列化方法
readFields(): 反序列化方法
注意:
在使用的时候, 序列化的顺序和反序列化的顺序必须保持一致, 否则会出现反序列化失败- 排序: 默认MR的排序对k2进行排序工作, 升序排序, 排序规则: 字典序和数字序
- 需求:
a 1
a 9
b 3
a 7
b 8
b 10
a 5
第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列代码实现:
- 定义sortBean操作:
package com.mr.sort;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SortBean implements WritableComparable<SortBean> {
private String first;
private Integer second;
public SortBean() {
}
public SortBean(String first, Integer second) {
this.first = first;
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public Integer getSecond() {
return second;
}
public void setSecond(Integer second) {
this.second = second;
}
@Override
public String toString() {
return first + "\t" + second ;
}
//序列化方法
// 注意: 序列化顺序 和 反序列化的顺序必须保持一致
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
// 比较器: 只需要指定按照谁来进行比较操作
// 技巧: 如果要想实现升序排序: this.compareTo(参数里内容) 倒序排序: 参数内容.compareTo(this)
@Override
public int compareTo(SortBean o) {
// 比较逻辑: 先比较第一列的数据(降序), 如果一致, 比较第二列数据
int i = o.first.compareTo(this.first); //返回值: 整数(前面比后面大) 负数(前面比后面的小) 和 0(一般大)
if(i == 0 ){ // 第一列相等
int i1 = this.second.compareTo(o.second);
return i1;
}
return i;
}
}- 定义mapper类:
package com.mr.sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {
private SortBean k2 = new SortBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString();
//2. 判断操作
if(line != null && !"".equals(line)){
//3. 封装 SortBean 对象
String[] fields = line.split("\t");
k2.setFirst(fields[0]);
k2.setSecond(Integer.parseInt(fields[1]));
//4. 写出去
context.write(k2,NullWritable.get());
}
}
}- 定义reduce类:
package com.mr.sort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key,value);
}
// context.write(key,NullWritable.get());
}
}- 创建驱动类
package com.mr.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SortDriver {
public static void main(String[] args) throws Exception{
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "SortDriver");
// 设置集群运行配置参数
job.setJarByClass(SortDriver.class);
//2. 封装任务对象: 天龙八部
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\排序\\input\\sort.txt"));
//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(NullWritable.class);
//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )
//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);
//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\排序\\output1"));
//3. 执行代码
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ? 0 : 1);
}
}MapReduce的规约操作
规约: 是MR中优化步骤, 主要目的是为了实现提前聚合操作, 减少从map 到reduce之间数据传输量
从对wordCount理解规约的过程, 说白了 对每一个map阶段数据, 进行提前聚合操作, 整个聚合操作基本与reduce聚合逻辑是一致的, 只不过规约是针对每一个map 而reduce是可以针对所有map的结果进行聚合
如何实现规约呢? 规约的逻辑和reduce的逻辑是一致
说明:
规约的实现步骤与reduce是一模一样的, reduce如何实现, 规约就如何实现即可, 只不过最后在驱动类 需要将reduce程序设置到规约的选项中
并不是所有的业务都可以进行规约操作,在进行规约之前, 一定要做可行性分析: 要求规约操作不能够影响最终结果,如果有 不要使用规约
比如说求平均数业务:
假设: 1,2,3,4,5 求平均数 3
比如有两个map
第一个map ; 1,2,3 ==> 2
第二个map: 4,5 ==> 4.5
reduce: 统计进行平均数 ==>3.25- 需求: 有 三个书架 ,每个书架上都有5本书, 要求 统计出 每种分类的下有几本书??
1号书架 2号书架 3号书架
<<java入门宝典>> <<Python入门宝典>> <<spark入门宝典>>
<<UI入门宝典>> <<乾坤大挪移>> <<hive入门宝典>>
<<天龙八部>> <<凌波微步>> <<葵花点穴手>>
<<史记>> <<PHP入门宝典>> <<铁砂掌>>
<<葵花宝典>> <<hadoop入门宝典>> <<论清王朝的腐败>>
分为三大类: 计算机类 武林秘籍 历史- 思考如何做: 相同k2的数据会被发往同一个reduce 相同k2的value数据合并为一个集合
map阶段: k1:行偏移量 v1: 一本书 --> k2(类别) v2 (1)
假设有二个map程序
第一个map程序:
输出结果:
计算机 1
计算机 1
计算机 1
计算机 1
武林秘籍 1
计算机 1
武林秘籍 1
武林秘籍 1
第一个map规约方案:
计算机 5
武林秘籍 3
武林秘籍 1
历史 1
计算机 1
武林秘籍 1
武林秘籍 1
计算机 1
历史 1
第二个map规约方案:
武林秘籍 3
历史 2
计算机 2
没有规约之前, reduce需要接收到 15次键值对
规约之后, reduce可以接收到 5次键值对
接收到: 分组后数据
计算机 [1,1,1,1,1,1,1]
武林秘籍: [1,1,1,1,1,1]
历史: [1,1]
reduce阶段: k2:类别 v2: 数值1集合 ---> k3: 类别 v3 结果数量
结果:
计算机 7
武林秘籍 6
历史 2- 代码实现:
- 实现map逻辑
package com.mr.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BookCombinerMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Text k2 = new Text();
private IntWritable v2 = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String book = value.toString();
//2. 执行判断操作
if(book != null && !"".equals(book)){
String category = "";
//3:判断 书是属于什么类别:
if(book.contains("入门宝典")){// 如果能进来, 说明书为 计算机类别
category= "计算机";
}else if (book.contains("史记") || book.contains("论清王朝的腐败") ){ // 如果能进来, 说明书为 历史类别
category= "历史";
}else { // 否则 就是武林秘籍
category= "武林秘籍";
}
//4. 写出去
k2.set(category);
context.write(k2,v2);
}
}
}- reduce代码:
package com.mr.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class BookConbinerReducer extends Reducer<Text,IntWritable,Text,LongWritable> {
private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1. 遍历v2的数据, 进行累加即可
long count = 0 ;
for (IntWritable value : values) {
count += value.get();
}
//2. 写出去
v3.set(count);
context.write(key,v3);
}
}- combiner类
package com.mr.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class BookConbiner extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable v3 = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1. 遍历v2的数据, 进行累加即可
int count = 0 ;
for (IntWritable value : values) {
count += value.get();
}
//2. 写出去
v3.set(count);
context.write(key,v3);
}
}- 驱动类
package com.mr.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BookCombinerDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(super.getConf(), "BookCombinerDriver");
// 设置集群运行的必备参数
job.setJarByClass(BookCombinerDriver.class);
//2. 封装 job的任务: 天龙八部
//2.1: 封装 读取数据输入类 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\combinner\\input\\combinner.txt"));
//2.2: 设置mapper类 及其mapper输出k2 和 v2的类型
job.setMapperClass(BookCombinerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//2.3: shuffle阶段: 分区 排序 规约 分组操作
job.setCombinerClass(BookConbiner.class);
//2.7: 设置reduce类, 及其输出 k3 和 v3的类型
job.setReducerClass(BookConbinerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.8: 设置 输出类 , 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\combinner\\output1"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
return flag ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int i = ToolRunner.run(new Configuration(), new BookCombinerDriver(), args);
System.exit(i);
}
}- 效果演示:
- MapReduce中分组操作
分组: 将相同k2的value数据合并为一个集合操作
需求: 现在需要求出每一个订单中成交金额最大的一笔交易
需求分析流程:
如何自定义分组
1) 创建一个类, 继承 WritableComparator
2) 编写空参构造方法, 在构造方法中, 调用父类构造
super( k2的类型 ,true);
3) 重写父类的compare方法, 在方法上有两个参数, 这两个参数其实就是需要进行比较的参数
4) 在compare方法, 编写比较规则: 根据业务来判断, 具体需要将随放置在一组, 就按照谁来进行比较即可
5) 告知给MR, 我自定义分组操作代码实现:
- 自定义数据类型
package com.mr.group;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String order_id;
private String product_id;
private Double price;
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(product_id);
out.writeDouble(price);
}
// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.product_id = in.readUTF();
this.price = in.readDouble();
}
// 排序比较的方法
@Override
public int compareTo(OrderBean o) {
// 比较逻辑: 首先先比较订单id是否一致(升序还是降序呢? 无所谓), 如果一致, 比较金额(降序)
int i = this.order_id.compareTo(o.order_id);
if(i == 0){ // 订单id是相等的
int i1 = o.price.compareTo(this.price);
return i1;
}
return i;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getProduct_id() {
return product_id;
}
public void setProduct_id(String product_id) {
this.product_id = product_id;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return order_id + "\t" + product_id + "\t" + price ;
}
}- 自定义map
package com.mr.group;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
private OrderBean k2 = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString(); //
//2. 判断
if( line != null && !"".equals(line) ){
//3. 对数据进行切割操作:
String[] fields = line.split("\t");
//4.封装到 OrderBean
k2.setOrder_id(fields[0]);
k2.setProduct_id(fields[1]);
k2.setPrice(Double.parseDouble(fields[2]));
//5. 写出去
context.write(k2,NullWritable.get());
}
}
}- 自定义分区操作
package com.mr.group;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class OrderPartition extends Partitioner<OrderBean,NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {
// 需求: 将相同订单ID的数据 划分同一个reduce中去
String order_id = orderBean.getOrder_id();
return (order_id.hashCode() & 2147483647) % numReduceTasks;
}
}- 自定义reduce操作
package com.mr.group;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int top = 2;
int len = 0;
for (NullWritable value : values) {
if(len >= top){
break;
}
context.write(key,value);
len ++;
}
}
}- 自定义分组操作
package com.mr.group;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// 自定义分组的是实现
public class MyOrderGroup extends WritableComparator{
public MyOrderGroup() {
// 参数1: 告知给分组类, k2的类型是什么 参数2: 是否创建k2类型 (true)
super( OrderBean.class ,true);
}
// 比较方法, 方法中两个参数, 本质上就是k2的相邻的两个元素
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean elem1 = (OrderBean)a;
OrderBean elem2 = (OrderBean)b;
return elem1.getOrder_id().compareTo(elem2.getOrder_id());
}
}- 驱动类
package com.mr.group;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class OrderDriver {
public static void main(String[] args) throws Exception {
//1. 创建 job对象
Job job = Job.getInstance(new Configuration(), "OrderDriver");
//设置 集群运行必备参数
job.setJarByClass(OrderDriver.class);
//2. 设置job的任务: 天龙八部
//2.1: 设置输入类, 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\自定义groupingComparator\\input\\orders.txt"));
//2.2: 设置mapper类, 及其mapper输出的k2和v2的类型
job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//2.3: 设置shuffle阶段: 分区
job.setPartitionerClass(OrderPartition.class);
//2.4: 设置shuffle阶段: 排序 数据类型中以及定义完成
//2.5: 设置shuffle阶段: 规约, 暂时没有
//2.6: 设置shuffle阶段: 分组操作
job.setGroupingComparatorClass(MyOrderGroup.class);
//2.7: 设置reduce类, 及其输出k3和v3的类型
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//2.8: 设置输出路径 及其输出 k3和v3的类型
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\自定义groupingComparator\\output1"));
//2.9: 设置reduceTask数量
job.setNumReduceTasks(2);
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ? 0 : 1);
}
}- 综合案例_倒排索引
需求: 求每个单词在各个文件出现了多少次
案例流程分析:
代码实现:
如何获取文件名
//4. 获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();- 定义map程序
package com.mr.index;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class IndexMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Text k2 = new Text();
private IntWritable v2 = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();
//2. 判断
if( line!= null && !"".equals(line) ){
//3. 对数据执行切割操作
String[] words = line.split(" ");
//4. 获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
//5. 组合k2操作:
for (String word : words) {
k2.set(fileName+"_"+word);
//6. 写出去
context.write(k2,v2);
}
}
}
}- 自定义reduce程序
package com.mr.index;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class IndexReducer extends Reducer<Text,IntWritable,Text,LongWritable> {
private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1. 遍历
long count = 0 ;
for (IntWritable value : values) {
count+= value.get();
}
//2. 写出去
v3.set(count);
context.write(key,v3);
}
}- 自定义Driver类
package com.mr.index;
import com.mr.group.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class IndexDriver {
public static void main(String[] args) throws Exception {
//1. 创建 job对象
Job job = Job.getInstance(new Configuration(), "IndexDriver");
//设置 集群运行必备参数
job.setJarByClass(IndexDriver.class);
//2. 设置job的任务: 天龙八部
//2.1: 设置输入类, 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\倒排索引\\input"));
//2.2: 设置mapper类, 及其mapper输出的k2和v2的类型
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//2.3: 设置shuffle阶段: 分区
//2.4: 设置shuffle阶段: 排序 数据类型中以及定义完成
//2.5: 设置shuffle阶段: 规约, 暂时没有
//2.6: 设置shuffle阶段: 分组操作
//2.7: 设置reduce类, 及其输出k3和v3的类型
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.8: 设置输出路径 及其输出 k3和v3的类型
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\倒排索引\\output"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ? 0 : 1);
}
}1 .MapReduce的综合练习
上网流量的统计_需求一
需求: 统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和
数据样例:
数据说明:
流程分析:
代码实现:
- 自定义数据类型:
package com.mr.anli.flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable{
private Long upFlow ;
private Long downFlow;
private Long upTotalFlow;
private Long downTotalFlow;
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getUpTotalFlow() {
return upTotalFlow;
}
public void setUpTotalFlow(Long upTotalFlow) {
this.upTotalFlow = upTotalFlow;
}
public Long getDownTotalFlow() {
return downTotalFlow;
}
public void setDownTotalFlow(Long downTotalFlow) {
this.downTotalFlow = downTotalFlow;
}
@Override
public String toString() {
return upFlow +"\t"+downFlow +"\t"+upTotalFlow +"\t"+downTotalFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(upTotalFlow);
out.writeLong(downTotalFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.upTotalFlow = in.readLong();
this.downTotalFlow = in.readLong();
}
}- mapper阶段代码
package com.mr.anli.flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Flow1Mapper extends Mapper<LongWritable,Text,Text,FlowBean> {
private Text k2 = new Text();
private FlowBean v2 = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();
//2. 判断
if( line != null && !"".equals(line) ){
//3.对数据进行切割处理
String[] fields = line.split("\t");
//4. 获取 k2和v2的数据
String phone = fields[1]; // 封装 k2
// 封装 v2
v2.setUpFlow(Long.parseLong(fields[6]));
v2.setDownFlow(Long.parseLong(fields[7]));
v2.setUpTotalFlow(Long.parseLong(fields[8]));
v2.setDownTotalFlow(Long.parseLong(fields[9]));
//5. 写出去:
k2.set(phone);
context.write(k2,v2);
}
}
}- reduce代码:
package com.mr.anli.flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Flow1Reducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean v3 = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
//1. 遍历 v2的数据, 封装v3数据
Long upFlow = 0L;
Long downFlow = 0L;
Long upTotalFlow = 0L;
Long downTotalFlow = 0L;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upTotalFlow += value.getUpTotalFlow();
downTotalFlow += value.getDownTotalFlow();
}
v3.setUpFlow(upFlow);
v3.setDownFlow(downFlow);
v3.setUpTotalFlow(upTotalFlow);
v3.setDownTotalFlow(downTotalFlow);
//2. 写出去
context.write(key,v3);
}
}- 驱动类
package com.mr.anli.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Flow1Driver {
public static void main(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "Flow1Driver");
// 集群运行的必备参数
job.setJarByClass(Flow1Driver.class);
//2. 封装 job的任务 : 天龙八部
//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\input"));
//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(Flow1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案
//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型
job.setReducerClass(Flow1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//2.8: 设置输出类, 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ?0 :1);
}
}上网流量的统计_需求二
- 需求: 对需求一的结果中上行流量和倒序排序(递减排序)
代码实现:
- 自定义数据类型:
package com.mr.anli.flow2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean2 implements WritableComparable<FlowBean2> {
private Long upFlow ;
private Long downFlow;
private Long upTotalFlow;
private Long downTotalFlow;
public FlowBean2() {
}
public FlowBean2(Long upFlow, Long downFlow, Long upTotalFlow, Long downTotalFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.upTotalFlow = upTotalFlow;
this.downTotalFlow = downTotalFlow;
}
@Override
public int compareTo(FlowBean2 o) {
return o.upFlow.compareTo(this.upFlow) ;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(upTotalFlow);
out.writeLong(downTotalFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.upTotalFlow = in.readLong();
this.downTotalFlow = in.readLong();
}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getUpTotalFlow() {
return upTotalFlow;
}
public void setUpTotalFlow(Long upTotalFlow) {
this.upTotalFlow = upTotalFlow;
}
public Long getDownTotalFlow() {
return downTotalFlow;
}
public void setDownTotalFlow(Long downTotalFlow) {
this.downTotalFlow = downTotalFlow;
}
@Override
public String toString() {
return upFlow +"\t"+downFlow +"\t"+upTotalFlow +"\t"+downTotalFlow;
}
}- 自定义map逻辑
package com.mr.anli.flow2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Flow2Mapper extends Mapper<LongWritable,Text,FlowBean2,Text> {
private Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();
//2. 判断
if( line != null && !"".equals(line)){
//3. 对数据执行分割操作
String[] fields = line.split("\t");
//4. 封装 k2和v2
FlowBean2 k2 = new FlowBean2(Long.parseLong(fields[1]),Long.parseLong(fields[2]),Long.parseLong(fields[3]),Long.parseLong(fields[4]));
v2.set(fields[0]);
//5. 写出去
context.write(k2,v2);
}
}
}- 自定义reduce逻辑
package com.mr.anli.flow2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Flow2Reducer extends Reducer<FlowBean2,Text,Text,FlowBean2> {
@Override
protected void reduce(FlowBean2 key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}- 驱动类
package com.mr.anli.flow2;
import com.mr.anli.flow.Flow1Mapper;
import com.mr.anli.flow.Flow1Reducer;
import com.mr.anli.flow.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Flow2Driver {
public static void main(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "Flow2Driver");
// 集群运行的必备参数
job.setJarByClass(Flow2Driver.class);
//2. 封装 job的任务 : 天龙八部
//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output"));
//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(Flow2Mapper.class);
job.setMapOutputKeyClass(FlowBean2.class);
job.setMapOutputValueClass(Text.class);
//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案
//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型
job.setReducerClass(Flow2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean2.class);
//2.8: 设置输出类, 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output1"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ?0 :1);
}
}社交粉丝案例
需求: 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 好友关系都是单向的
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J实现思路:
样例数据:
A: B C E F
B: A E
C: A D
D: E B
E: F C B
F: C D
核心一句话: 相同key数据发往同一个reduce,相同k2的value数据合并为一个集合
需求一: 用户在那些人的好友列表中
map阶段: k2 v2
B A
C A
E A
F A
A B
E B
A C
D C
E D
B D
F E
C E
B E
C F
D F
分组后结果:
B : [A D E]
C : [A E F]
E : [A B D]
F : [A E ]
A : [B C]
D : [F C]
reduce阶段: K3 V3
A B-C
B A-D-E
C A-E-F
D F-C
E A-B-D
F A-E
求共同好友呢?
map阶段: k2 v2
B-C A
A-D B
A-E B
D-E B
A-E C
A-F C
E-F C
F-C D
A-B E
A-D E
B-D E
A-E F
说明 map阶段在进行两两拼接的时候, 保证 小的在前, 大的在后
分组:
A-B : [E]
A-D : [B E]
A-E : [ B C F]
A-F : [C]
B-C : [A]
B-D : [E]
D-E : [B]
E-F : [C]
F-C : [D]
reduce阶段: k3 和 v3
结果:
A-B: E-
A-D: B-E-
A-E: C-B-F-
A-F: C
B-C: A
B-D: E
C-F: D
D-E: B
E-F: C代码实现: 第一个需求, 求用户在那些人的好友列表中
- mapper类实现
package com.mr.anli.friend;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FriendMapper1 extends Mapper<LongWritable,Text,Text,Text> {
private Text k2 = new Text();
private Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString();
//2. 判断
if( line!=null && !"".equals(line) ){
//3. 对数据进行切割处理
String[] fields = line.split(":");
String v2Str = fields[0];
String[] friends = fields[1].split(",");
//4. 遍历用户好友列表, 将每一个好友当做k2
for (String friend : friends) {
k2.set(friend);
v2.set(v2Str);
context.write(k2,v2);
}
}
}
}- reduce类的实现
package com.mr.anli.friend;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FriendReducer1 extends Reducer<Text,Text,Text,Text> {
private Text v3 = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1. 遍历v2的数据
StringBuffer v3Str = new StringBuffer();
for (Text value : values) {
v3Str.append( value.toString()+"-");
}
//2. 写出去
v3.set(v3Str.toString());
context.write(key,v3);
}
}- 驱动类
package com.mr.anli.friend;
import com.mr.anli.flow.Flow1Mapper;
import com.mr.anli.flow.Flow1Reducer;
import com.mr.anli.flow.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class FriendDriver1 {
public static void main(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "FriendDriver1");
// 集群运行的必备参数
job.setJarByClass(FriendDriver1.class);
//2. 封装 job的任务 : 天龙八部
//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\input"));
//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(FriendMapper1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案
//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型
job.setReducerClass(FriendReducer1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.8: 设置输出类, 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ?0 :1);
}
}代码实现: 求共同好友
- mapper类的编写
package com.mr.anli.friend;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.beans.FeatureDescriptor;
import java.io.IOException;
import java.util.Arrays;
public class FriendMapper2 extends Mapper<LongWritable,Text,Text,Text> {
private Text v2 = new Text();
private Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();
//2. 判断
if( line != null && !"".equals(line)){
//3. 对数据进行切割操作
String[] fields = line.split("\t");
v2.set(fields[0]);
String[] friends = fields[1].split("-");
//4. 对 数据进行从小到大的排序
Arrays.sort(friends);
//5. 对数组数据进行两两比较:
// a b c d:
for( int i = 0 ; i<friends.length-1; i++){
for( int j = i+1; j < friends.length ; j++ ){
String k2Str = friends[i] +"-" + friends[j];
//6. 写出去:
k2.set(k2Str);
context.write(k2,v2);
}
}
}
}
}- recuceTask代码实现
package com.mr.anli.friend;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FriendReducer2 extends Reducer<Text,Text,Text,Text> {
private Text v3 = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1. 遍历value的数据
StringBuffer v3str =new StringBuffer();
for (Text value : values) {
v3str.append(value+"-");
}
//2. 写出去
v3.set(v3str.toString());
context.write(key,v3);
}
}- driver类实现
package com.mr.anli.friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class FriendDriver2 {
public static void main(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "FriendDriver2");
// 集群运行的必备参数
job.setJarByClass(FriendDriver2.class);
//2. 封装 job的任务 : 天龙八部
//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output"));
//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(FriendMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案
//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型
job.setReducerClass(FriendReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.8: 设置输出类, 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output1"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ?0 :1);
}
}