在上一篇([08]Hadoop 改成完全分散模式)透過複製VM的方式建立出了fully-distributed mode,基本上在這個系列裡面對於Hadoop的介紹也快到了一個尾聲。
不過,還有一個部分被忽略了,也就是實際在Hadoop做運算的程式,也是WordCount的實際運算邏輯。
這篇會介紹MapReduce的概念,並且看一下WordCount的java程式是如何撰寫。
什麼是MapReduce
MapReduce其實是一種開發模式(Program Model),基本上可以把整個邏輯分成為Map階段和Reduce階段。
- Map階段基本上會做filtering和sorting並且傳出一個key value pair做結果(以wordcount為例,每一個字會作為最後的key,而value則是1代表有一筆)
- Reduce階段基本上會做整合(以wordcount為例,從Map傳過來的key如果一樣,表示同一個字,因此把一樣的key做加總最後的出總筆數)
從下圖可以看到整個的流程:
- input
- 這個是要做計算的原始資料,以上圖為例其實就是一堆文字清單
- split
- 把input資料做分散處理 - 以hadoop來說,當MapReduce工作被輸入的時候,會被切割到各個cluster裡面等待做處理
- map
- 這個就是MapReduce裡面的Map階段 - 每一個節點會把對應切割出來的資料建立key value結果 - key是字本身,然後value是1代表找到一筆
- combine
- 這個其實也是在map的機器裡面做 - 把每一個key一樣的先做一次加總,避免傳送多次出去
- shuffle & sort
- 在進入reduce階段之前,會先被做一個排序,因此相關的key值會放在一起
- reduce
- 這個階段會做實際的加總,因此每一個key以的的value會被加總
- outpu
- 這個是最後得到的結果
這邊需要注意一下,當提到map和reduce是小寫的時候,指的會是functional programing提供的方法。MapReduce則是開發模式。
上圖雖然用了小寫,不過這邊指的還是hadoop裡面的MapReduce。
換個方式理解 - 用選舉為例
如果上面那個例子看了還是有點模糊,換個生活遇到的例子作說明
當台灣遇到選舉的時候,一般來說有選舉權的民眾會去戶籍地去做投票 - 投票完有沒有看當天新聞了解這些投票是怎麼計算的嗎?
如果那個時候看新聞,會注意到,會有跑馬燈一直跑說,某某縣市目前xxx有幾票 - 這個票數是及時在變動:
整個數票的動作其實就是MapReduce。
- input
- 所有有投票的票數就是整個input
- split
- 每個可以投票的民眾去戶籍地投票,同等於把這個input split到不同的區域
- map
投票時間截止了之後,每一個投票站會開始從箱子取出來,然後唱名這張票屬於哪個候選人。
每一張票的候選人就是key,然後唱名1票就是value
- combine
- 當每一個投票站都分好了之後,會先做一個初步的加總,得到的每個站的總票數。
- shuffle & sort
- 在這個階段,會把每個投票站同一個候選人(key)的放在一起
- reduce
- 做最後加總 - 把所有一樣key的值加在一起
- output
- 最後結果就是誰當選了
首先,每個可以投票的會去戶籍地做投票的動作,這個其實同等於
怎麼在Hadoop寫MapReduce
希望透過上面的比喻方式,對於整個MapReduce有個更清楚的了解,那在Hadoop裡面怎麼寫MapReduce呢?
Hadoop是java的程式,因此用java寫一定是最容易,下面快速介紹一下如何用java寫MapReduce,大概會分幾個部分:
- Map
- Reduce
- 設定
Map
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
基本上,上面建立了一個Map
class繼承Mapper
並且定義了一個方法叫做map
。
Hadoop會把每一段文字個用value
傳過來,因此用了tokenizer把裡面的word取出來。
每一個取出來的word,會被寫成一組key value pair(context.write(word,one)
),word是key,value是數值1
。
會一直做,直到整個word都處理完。
Reduce
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
Reducer和mapper類似,先定義一個class叫做Reduce
繼承Reducer
。
裡面有一個reduce
的程式定義reduce階段要做什麼
在這邊,java已經有處理好把一樣的key放成一組,因此可以透過迴圈的方式把所有值加總。
最後把整個結果寫出去,一樣是key value pair,key還是原來的key,不過value是所有的加總。
設定
Map階段和Reduce階段的功能都定義好了之後,接下來需要做的是告訴程式執行的時候那個是Map和那個是Reduce。
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
// 剛剛定義的 Map
....
// 剛剛定義的 Reduce
....
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
這個程式應該蠻好理解,基本上就是把剛剛定義好的Map和Reduce做設定。
這邊比較特別是Combiner
的部分,因為也是加總所以和reduce是一樣的概念。
結語
透過這篇了解了整個MapReduce的運作機制,並且看了如何用Java寫過一個WordCount的MapReduce程式。
這邊會發現到,程式裡面完全沒有任何分散式處理的概念,但是Hadoop會自動以分散式的模式執行。這個讓撰寫變得非常簡單。
可是另外一個問題會浮現出來,難道只有Java可以寫MapReduce嗎?
在下一篇([10]用.Net Core跑Hadoop MapReduce - Streaming介紹)將會介紹如何用.net core寫出可以再Hadoop透過stream的方式執行的MapReduce,並且這次會改成用docker的方式來執行,提供另外一種更快速和容易測試Hadoop的方式。