上一篇([09]了解Hadoop裡的MapReduce到底是什麼?)了解了什麼是MapReduce,並且了解了怎麼用Java寫一個MapReduce的Hello World程式:WordCount。
馬上會想到的一個問題是,難道只有Java可以寫MapReduce的程式嗎?
這篇將會介紹Hadoop的Streaming服務,讓任何語言只要透過Standard Input和Standard Output就可以寫出MapReduce程式。 將會使用最熟悉的語言,.Net Core來完成這個事情。
在這篇也會介紹另外一種測試Hadoop的方式,使用Docker來測試。
什麼是Hadoop Streaming
當一個MapReduce的程式被執行的時候,會先被切割成為一個一個的Task,然後由那台的DataNode用Java執行那個Task。
所以整個執行類似下圖,整個MapReduce都在JVM的環境下:
不過Hadoop考量到如果外部需要執行MapReduce要怎麼辦,因此建立了一個叫做Streaming的功能。
基本上,只要那台DataNode可以Run的起來都可以跑。
Hadoop Streaming透過Standard Input/Output/Error 3個管道 來和被Run起來的程式溝通。
MapReduce的程式只需要從Standard Input讀進來,做處理,然後在寫到Output。如果有錯誤訊息可以記錄在Error裡面。
整個概念大概是:
實際操作
還記得整個MapReduce基本上就是在每個階段做過處理之後,會產生一個key value pair。Hadoop用tab來切割Key 和 Value。
有了這個概念之後來看實際程式,以下使用的是.Net Core的console來開發,分幾個階段:
- Mapper開發
- Reducer開發
- 測試結果
Mapper開發
由於是透過Standard Input/Output,因此console非常適合,所以會建立一個Mapper的.Net Core Console程式。
在Mapper的階段,內容會是一行一行讀進來,所以把讀進來的內容做文字切割, 每找到一個word,就寫到output,word是key,1是value(代表找到一筆)
會一直迴圈的讀,直到沒有任何檔案為止。如果把這個和之前java比照會發現邏輯一樣。
class Program
{
static void Main(string[] args)
{
string line;
while ((line = Console.ReadLine()) != null)
{
// 用文字切割
var words = Regex.Matches(line, @"[\w]+");
foreach (var word in words)
{
// 每一個找到的算1筆 - keyvalue用tab切割
Console.WriteLine("{0}\t1", word);
}
}
}
}
Reducer開發
會在建立另外一個專案用來放Reducer的程式。
Reducer一樣是讀Input然後寫到output。由於這次讀到的內容是從Mapper來的,所以會先用tab做切割,key是word,value就是筆數(也都是1)。
在這邊,有建立一個words dictionary,這個是因為在Mapper階段其實沒有管word有沒有重複,反正出現就是+1。
不過在Reducer因為要加總,因此用了words
dictionary作為一個暫存的空間。
最後把所有結果寫到output - 也是 key value pair,key一樣是word,不過value就是word出現的總數。
static void Main(string[] args)
{
// 用來儲存已經出現過的字 - java版本會自動處理,不過這個stream需要手動記錄
Dictionary<string, int> words = new Dictionary<string, int>();
string line;
while ((line = Console.ReadLine()) != null)
{
// 傳過來的key value用tab分割(Mapper也是用tab切割key和value)
var keyValuePair = line.Split('\t');
string word = keyValuePair[0];
int count = Convert.ToInt32(keyValuePair[1]);
// 如果已經有這個word,和字典的加總,不然就建立新的
if (words.ContainsKey(word))
{
words[word] += count;
}
else
{
words.Add(word, count);
}
}
// 把所有結果寫出來
foreach (var word in words)
{
Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
測試結果
當整個程式準備好了之後,接下來就可以對這個程式做測試了。
在接下來將會用一個docker版本的hadoop做測試 - 希望透過docker方式也可以了解用docker做測試有多方便。
接下來的測試都是在powershell可以直接執行。
如果對docker不熟悉,那麼下面做不了。要跑docker基本上要Windows 10 Professional以上或者linux,並且有裝docker。
裡面用到的docker image是一個linux的container。
下面也可以直接在之前建立的Ubuntu環境裡面執行,不過需要先:
- 安裝.net core 2.0
- 跳過前面的步奏,知道後面呼叫hadoop Streaming那段即可
接下來的指令操作都是在從github clone下來的專案裡面src\chapter-10-dotnet-mapreduce
的資料夾下面執行。
完整的指令是:
git clone https://github.com/alantsai/blog-data-science-series.git
cd .\blog-data-science-series\src\chapter-10-dotnet-mapreduce
- 先把.net core的console 發佈出來
- 在powershell執行指令:
dotnet publish -o ${pwd}\dotnetmapreduce .\DotNetMapReduceWordCount\DotNetMapReduceWordCount.sln
- 把hadoop用docker compose啟動
- 使用指令把hadoop啟動:
docker-compose up -d
。 會看到:- 執行完有1個master 2個worker啟動
- 在YARN的web節點看到有兩個Node
- 在DataNode看到有兩個節點
- 把.Net core程式複製到master的hadoop節點裡面
- 把剛剛發佈出來的.Net core程式複製到master裡面,並且進入到master裡面的bash並且可以看到有copy進去的內容
docker cp dotnetmapreduce hadoop-dotnet-master:/dotnetmapreduce docker exec -it hadoop-dotnet-master bash ls ls /dotnetmapreduce
- 把要計算的檔案放到hadoop的HDFS
- 透過下面指令把檔案放到hadoop的HDFS的input資料夾並且檢查:
hadoop fs -mkdir -p /input hadoop fs -copyFromLocal /dotnetmapreduce/jane_austen.txt /input hadoop fs -ls /input
- 用hadoop Streaming執行net core mapreduce
- 用hadoop的streaming執行:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -files "/dotnetmapreduce" \ -mapper "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Mapper.dll" \ -reducer "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Reducer.dll" \ -input /input/* -output /output
- 檢查結果
- 執行完了之後,可以看到計算的每個字出現次數
hadoop fs -ls /output hadoop fs -cat /output/part-00000
會注意到這邊的結果和java版本有點不同,因為判斷字的邏輯不同導致。如果docker不需要了,可以用docker-compose down
把整個hadoop關掉。
結語
在這篇介紹了透過Hadoop Streaming達到在hadoop用.Net core 2.0的console程式做MapReduce如何。
這篇也改成使用docker來做hadoop測試而不是用一直以來建立的VM。用docker和VM比較會發現到docker其實做這種事情非常方便,如果對docker不熟悉,可以考慮花點時間做些學習(之後我的部落格也會有個系列介紹docker使用,有興趣的話請持續關注)。
在這個系列的Hadoop介紹也到了一個尾聲,在下一篇([11]Hadoop總結(上篇)–Ecosystem介紹)將會對目前hadoop有介紹的部分做一個總結,介紹hadoop的ecosystem,和還有什麼部分是應該繼續關注下去。