Alan Tsai 的學習筆記


學而不思則罔,思而不學則殆,不思不學則“網貸” 為現任微軟最有價值專家 (MVP)、微軟認證講師 (MCT) 、Blogger、Youtuber:記錄軟體開發的點點滴滴 著重於微軟技術、C#、ASP .NET、Azure、DevOps、Docker、AI、Chatbot、Data Science

[Data Science 到底是什麼從一個完全外行角度來看][09]了解Hadoop裡的MapReduce到底是什麼?

image
圖片來源: https://pixabay.com/en/books-spine-colors-pastel-1099067/https://pixabay.com/en/math-blackboard-education-classroom-1547018/

在上一篇([08]Hadoop 改成完全分散模式)透過複製VM的方式建立出了fully-distributed mode,基本上在這個系列裡面對於Hadoop的介紹也快到了一個尾聲。

不過,還有一個部分被忽略了,也就是實際在Hadoop做運算的程式,也是WordCount的實際運算邏輯。

這篇會介紹MapReduce的概念,並且看一下WordCount的java程式是如何撰寫。

什麼是MapReduce

MapReduce其實是一種開發模式(Program Model),基本上可以把整個邏輯分成為Map階段和Reduce階段。

  • Map階段基本上會做filtering和sorting並且傳出一個key value pair做結果(以wordcount為例,每一個字會作為最後的key,而value則是1代表有一筆)
  • Reduce階段基本上會做整合(以wordcount為例,從Map傳過來的key如果一樣,表示同一個字,因此把一樣的key做加總最後的出總筆數)

從下圖可以看到整個的流程:

image
整個WordCount的MapReduce流程。來源:https://www.mssqltips.com/sqlservertip/3222/big-data-basics--part-5--introduction-to-mapreduce/

input
這個是要做計算的原始資料,以上圖為例其實就是一堆文字清單
split
把input資料做分散處理 - 以hadoop來說,當MapReduce工作被輸入的時候,會被切割到各個cluster裡面等待做處理
map
這個就是MapReduce裡面的Map階段 - 每一個節點會把對應切割出來的資料建立key value結果 - key是字本身,然後value是1代表找到一筆
combine
這個其實也是在map的機器裡面做 - 把每一個key一樣的先做一次加總,避免傳送多次出去
shuffle & sort
在進入reduce階段之前,會先被做一個排序,因此相關的key值會放在一起
reduce
這個階段會做實際的加總,因此每一個key以的的value會被加總
outpu
這個是最後得到的結果

這邊需要注意一下,當提到map和reduce是小寫的時候,指的會是functional programing提供的方法。MapReduce則是開發模式。

上圖雖然用了小寫,不過這邊指的還是hadoop裡面的MapReduce。

Map和Reduce階段回傳的結果都是一個key value pair。

換個方式理解 - 用選舉為例

如果上面那個例子看了還是有點模糊,換個生活遇到的例子作說明

當台灣遇到選舉的時候,一般來說有選舉權的民眾會去戶籍地去做投票 - 投票完有沒有看當天新聞了解這些投票是怎麼計算的嗎?

如果那個時候看新聞,會注意到,會有跑馬燈一直跑說,某某縣市目前xxx有幾票 - 這個票數是及時在變動:

image
選舉的時候新聞及時播放票數。來源:http://my-own-post.com/new20150116/

整個數票的動作其實就是MapReduce。

input
所有有投票的票數就是整個input
split
每個可以投票的民眾去戶籍地投票,同等於把這個input split到不同的區域
map

投票時間截止了之後,每一個投票站會開始從箱子取出來,然後唱名這張票屬於哪個候選人。

每一張票的候選人就是key,然後唱名1票就是value

combine
當每一個投票站都分好了之後,會先做一個初步的加總,得到的每個站的總票數。
shuffle & sort
在這個階段,會把每個投票站同一個候選人(key)的放在一起
reduce
做最後加總 - 把所有一樣key的值加在一起
output
最後結果就是誰當選了

首先,每個可以投票的會去戶籍地做投票的動作,這個其實同等於

怎麼在Hadoop寫MapReduce

希望透過上面的比喻方式,對於整個MapReduce有個更清楚的了解,那在Hadoop裡面怎麼寫MapReduce呢?

Hadoop是java的程式,因此用java寫一定是最容易,下面快速介紹一下如何用java寫MapReduce,大概會分幾個部分:

  • Map
  • Reduce
  • 設定

Map

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException,
        InterruptedException {

            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
}

基本上,上面建立了一個Mapclass繼承Mapper並且定義了一個方法叫做map

Hadoop會把每一段文字個用value傳過來,因此用了tokenizer把裡面的word取出來。

每一個取出來的word,會被寫成一組key value pair(context.write(word,one)),word是key,value是數值1

會一直做,直到整個word都處理完。

Reduce

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
  IOException, InterruptedException {
   int sum = 0;

   for (IntWritable val : values) {
    sum += val.get();
   }

   context.write(key, new IntWritable(sum));
 }
}

Reducer和mapper類似,先定義一個class叫做Reduce繼承Reducer

裡面有一個reduce的程式定義reduce階段要做什麼

在這邊,java已經有處理好把一樣的key放成一組,因此可以透過迴圈的方式把所有值加總。

最後把整個結果寫出去,一樣是key value pair,key還是原來的key,不過value是所有的加總。

設定

Map階段和Reduce階段的功能都定義好了之後,接下來需要做的是告訴程式執行的時候那個是Map和那個是Reduce。

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

 // 剛剛定義的 Map
 ....

 // 剛剛定義的 Reduce
 ....

 public static void main(String[] args) throws Exception {
  JobConf conf = new JobConf(WordCount.class);
  conf.setJobName("wordcount");

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);

  conf.setMapperClass(Map.class);
  conf.setCombinerClass(Reduce.class);
  conf.setReducerClass(Reduce.class);

  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

這個程式應該蠻好理解,基本上就是把剛剛定義好的Map和Reduce做設定。

這邊比較特別是Combiner的部分,因為也是加總所以和reduce是一樣的概念。

結語

透過這篇了解了整個MapReduce的運作機制,並且看了如何用Java寫過一個WordCount的MapReduce程式。

這邊會發現到,程式裡面完全沒有任何分散式處理的概念,但是Hadoop會自動以分散式的模式執行。這個讓撰寫變得非常簡單。

可是另外一個問題會浮現出來,難道只有Java可以寫MapReduce嗎?

在下一篇([10]用.Net Core跑Hadoop MapReduce - Streaming介紹)將會介紹如何用.net core寫出可以再Hadoop透過stream的方式執行的MapReduce,並且這次會改成用docker的方式來執行,提供另外一種更快速和容易測試Hadoop的方式。


如果文章對您有幫助,就請我喝杯飲料吧
街口支付QR Code
街口支付QR Code
台灣 Pay QR Code
台灣 Pay QR Code
Line Pay 一卡通 QR Code
Line Pay 一卡通 QR Code
街口支付QR Code
支付寶QR Code
街口支付QR Code
微信支付QR Code
comments powered by Disqus