MapReduce了解
MapReduce 场景:比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,单机版很难搞定。
MapReduce 产生背景
如果让你统计日志里面出现的某个 URL 的总次数,让你自己去写个单机版的程序,写个逻辑:无非就是读这个文件一行,然后把那个地方截取出来,截取出来之后,然后可以把它放到一个 HashMap 里面,用 Map 去重,看到一条新的 URL ,就把它 put 进去,然后+1,如果下次看到再有就直接+1,没有就 put 进去,单机版的话逻辑是很好实现,但是数据量一大,你单机版本还能搞定吗?
首先 2T 的文件,你放在单机上可能存不下来,如果再他多一点呢?比如几千个文件,几十个 T,单机存都存不下,那么存在哪里——HDFS 上。
因为放在 HDFS 上可以放很多很多,比如说 HDFS 上有 100 个节点,每个节点上能挂载 8T 的硬盘,那就有 800T,每个文件存 3 个副本的话,耗费了大概 6 个 T 的空间。
但是你一旦放到 HDFS 上就有一个问题:你的文件就会被切散了,被切散到很多的机器上,这个时候,你再对它们进行统计,按照原来的逻辑,会不会出现问题?
你的任何一个节点上存的是某个文件的某些块,假设你是在那台机器上去做统计的话,你统计到的永远是局部的数据。如果你专门写一个客户端,你的程序运行在这个客户端上,你去读数据,读一点统计一点,到把整个文件都读完了,统计结果也就出来了,问题是那样的话,你的程序又变成了一个单机版的,那你的内存也就不够。
那你是不是应该把你的程序分发到集群的每一台 DN 上去做统计,也就是把运算往数据去移动,而不是把数据移动到运算,把我的运算逻辑移动到数据那端去,数据在哪里,我就在哪里运算。
但是这也有一个问题,因为运算也变成了一个分布式的了,你的每一份运算结果都只是局部的结果,那么这个时候也存在问题:
- 代码分发:你的代码怎么实现分发到很多机器上去运行,需要有一个资源分发和 Java 启动程序配置的系统
- 数据本地性:代码究竟分发到哪些机器上去运行,需要策略和算法
- 容错:如果某台机器宕机了,局部结果也就没有了,需要时刻监控程序运行情况
- 结果汇总:30 台节点的局部结果需要汇总,需要中间数据的调度系统
MapReduce 的作用
当我们面临海量数据处理的时候,那个逻辑也许很简单,但是面临海量数据处理,要我们这个逻辑代码变成分布式运行,就会变得很复杂,而那些很复杂的事情又不是我们关心的,我关心的只是那个逻辑。
MapReduce 的框架和 Yarn 就把那些我们不擅长的而且又必须解决的,跟我们的逻辑关系不大的那些东西全部给封装起来。我们直接写逻辑就可以了。
MapReduce 程序对我们程序员来说很简单,因为它把那些东西都给封装起来了,你只要写业务逻辑。
但是写 MapReduce 的时候,必须要符合人家编程的规范。任意一个逻辑实现都要分成这么两个步骤:
- Map
- Reduce
MapReduce 代码示例:单词统计
WCMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:输入的key的类型(一行的起始偏移量)
* VALUEIN:输入的value的类型(这一行的内容)
* KEYOUT:输出的key的数据类型(单词)
* VALUEOUT:输出的value的数据类型(1)
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1. 拿到一行文本
String line = value.toString();
// 2. 切分单词
String[] words = line.split(" ");
// 3. 输出为 <K,V> 形式 key是单词,value是1
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
Hadoop 数据类型说明:
| Java类型 | Hadoop类型 | 说明 |
|---|---|---|
| Long | LongWritable | 序列化传输优化 |
| String | Text | 自建序列化机制 |
| int | IntWritable | 减少负载信息 |
WCReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, IntWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
// 遍历value list,进行累加
for (IntWritable value : values) {
count += value.get();
}
// 输出一组(一个单词)的统计结果
context.write(key, new Text(count + ""));
}
}
WCRunner.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 用来描述一个特定的作业
* 该作业使用哪个类作为逻辑处理的map
* 哪个作为reduce
* 还可以指定该作业要处理的数据所在的路径
* 还可以指定该作业输出的结果放到哪个路径
*/
public class WCRunner {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
// 指定Mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定Reducer
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定原始数据路径
FileInputFormat.setInputPaths(job, new Path("input/data1"));
// 指定处理结果输出路径
FileOutputFormat.setOutputPath(job, new Path("output1"));
// 提交作业
int isok = job.waitForCompletion(true) ? 0 : -1;
System.exit(isok);
}
}
总结
MapReduce 把我们很简单的运算逻辑很方便地扩展到海量数据的场景下分布式运算。程序员只需要关注业务逻辑(处理文本、处理字符串),而不需要关心分布式细节。但必须按照 Map-Reduce 的编程规范来编写代码。