Alan Tsai 的學習筆記


學而不思則罔,思而不學則殆,不思不學則“網貸” 為現任微軟最有價值專家 (MVP)、微軟認證講師 (MCT) 、Blogger、Youtuber:記錄軟體開發的點點滴滴 著重於微軟技術、C#、ASP .NET、Azure、DevOps、Docker、AI、Chatbot、Data Science

[Data Science 到底是什麼從一個完全外行角度來看][10]用.Net Core跑Hadoop MapReduce - Streaming介紹

image
圖片來源: https://pixabay.com/en/books-spine-colors-pastel-1099067/https://pixabay.com/en/math-blackboard-education-classroom-1547018/

上一篇([09]了解Hadoop裡的MapReduce到底是什麼?)了解了什麼是MapReduce,並且了解了怎麼用Java寫一個MapReduce的Hello World程式:WordCount。

馬上會想到的一個問題是,難道只有Java可以寫MapReduce的程式嗎?

這篇將會介紹Hadoop的Streaming服務,讓任何語言只要透過Standard Input和Standard Output就可以寫出MapReduce程式。 將會使用最熟悉的語言,.Net Core來完成這個事情。

在這篇也會介紹另外一種測試Hadoop的方式,使用Docker來測試。

什麼是Hadoop Streaming

當一個MapReduce的程式被執行的時候,會先被切割成為一個一個的Task,然後由那台的DataNode用Java執行那個Task。

所以整個執行類似下圖,整個MapReduce都在JVM的環境下:

image
JVM的MapReduce

不過Hadoop考量到如果外部需要執行MapReduce要怎麼辦,因此建立了一個叫做Streaming的功能。

基本上,只要那台DataNode可以Run的起來都可以跑。

Hadoop Streaming透過Standard Input/Output/Error 3個管道 來和被Run起來的程式溝通。

MapReduce的程式只需要從Standard Input讀進來,做處理,然後在寫到Output。如果有錯誤訊息可以記錄在Error裡面。

整個概念大概是:

image
Hadoop Streaming

實際操作

還記得整個MapReduce基本上就是在每個階段做過處理之後,會產生一個key value pair。Hadoop用tab來切割Key 和 Value。

有了這個概念之後來看實際程式,以下使用的是.Net Core的console來開發,分幾個階段:

  1. Mapper開發
  2. Reducer開發
  3. 測試結果

Mapper開發

由於是透過Standard Input/Output,因此console非常適合,所以會建立一個Mapper的.Net Core Console程式。

在Mapper的階段,內容會是一行一行讀進來,所以把讀進來的內容做文字切割, 每找到一個word,就寫到output,word是key1是value(代表找到一筆)

會一直迴圈的讀,直到沒有任何檔案為止。如果把這個和之前java比照會發現邏輯一樣。

class Program
{
    static void Main(string[] args)
    {
        string line;

        while ((line = Console.ReadLine()) != null)
        {
            // 用文字切割
            var words = Regex.Matches(line, @"[\w]+");

            foreach (var word in words)
            {
                // 每一個找到的算1筆 - keyvalue用tab切割
                Console.WriteLine("{0}\t1", word);
            }
        }
    }
}

Reducer開發

會在建立另外一個專案用來放Reducer的程式。

Reducer一樣是讀Input然後寫到output。由於這次讀到的內容是從Mapper來的,所以會先用tab做切割,key是word,value就是筆數(也都是1)。

在這邊,有建立一個words dictionary,這個是因為在Mapper階段其實沒有管word有沒有重複,反正出現就是+1。

不過在Reducer因為要加總,因此用了words dictionary作為一個暫存的空間。

最後把所有結果寫到output - 也是 key value pair,key一樣是word,不過value就是word出現的總數。

static void Main(string[] args)
{
    // 用來儲存已經出現過的字 - java版本會自動處理,不過這個stream需要手動記錄
    Dictionary<string, int> words = new Dictionary<string, int>();

    string line;

    while ((line = Console.ReadLine()) != null)
    {
        // 傳過來的key value用tab分割(Mapper也是用tab切割key和value)
        var keyValuePair = line.Split('\t');

        string word = keyValuePair[0];

        int count = Convert.ToInt32(keyValuePair[1]);

        // 如果已經有這個word,和字典的加總,不然就建立新的
        if (words.ContainsKey(word))
        {
            words[word] += count;
        }
        else
        {
            words.Add(word, count);
        }
    }

    // 把所有結果寫出來
    foreach (var word in words)
    {
        Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
和Java的版本不同,java版本會自動幫忙把key一樣組成一個list比較好操作,但是透過streaming需要自己手動操作。

測試結果

當整個程式準備好了之後,接下來就可以對這個程式做測試了。

在接下來將會用一個docker版本的hadoop做測試 - 希望透過docker方式也可以了解用docker做測試有多方便。

接下來的測試都是在powershell可以直接執行。

如果對docker不熟悉,那麼下面做不了。要跑docker基本上要Windows 10 Professional以上或者linux,並且有裝docker。

裡面用到的docker image是一個linux的container。

下面也可以直接在之前建立的Ubuntu環境裡面執行,不過需要先:

  • 安裝.net core 2.0
  • 跳過前面的步奏,知道後面呼叫hadoop Streaming那段即可

接下來的指令操作都是在從github clone下來的專案裡面src\chapter-10-dotnet-mapreduce的資料夾下面執行。

完整的指令是:

git clone https://github.com/alantsai/blog-data-science-series.git
cd .\blog-data-science-series\src\chapter-10-dotnet-mapreduce
                        
先把.net core的console 發佈出來
在powershell執行指令:dotnet publish -o ${pwd}\dotnetmapreduce .\DotNetMapReduceWordCount\DotNetMapReduceWordCount.sln
image
發佈結果
把hadoop用docker compose啟動
使用指令把hadoop啟動:docker-compose up -d。 會看到:
  1. 執行完有1個master 2個worker啟動
  2. 在YARN的web節點看到有兩個Node
  3. 在DataNode看到有兩個節點
image
可以看到啟動成功並且有兩個節點
把.Net core程式複製到master的hadoop節點裡面
把剛剛發佈出來的.Net core程式複製到master裡面,並且進入到master裡面的bash並且可以看到有copy進去的內容
docker cp dotnetmapreduce hadoop-dotnet-master:/dotnetmapreduce
docker exec -it hadoop-dotnet-master bash
ls
ls /dotnetmapreduce
image
進入到master的bash並且檢查copy是否成功
把要計算的檔案放到hadoop的HDFS
透過下面指令把檔案放到hadoop的HDFS的input資料夾並且檢查:
hadoop fs -mkdir -p /input
hadoop fs -copyFromLocal /dotnetmapreduce/jane_austen.txt /input
hadoop fs -ls /input
                        
image
複製檔案到HDFS
用hadoop Streaming執行net core mapreduce
用hadoop的streaming執行:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
    -files "/dotnetmapreduce" \
    -mapper "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Mapper.dll" \
    -reducer  "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Reducer.dll" \
    -input /input/* -output /output
                        
image
和之前執行map reduce的log一樣
檢查結果
執行完了之後,可以看到計算的每個字出現次數
hadoop fs -ls /output
hadoop fs -cat /output/part-00000
image
執行結果
會注意到這邊的結果和java版本有點不同,因為判斷字的邏輯不同導致。
如果docker不需要了,可以用docker-compose down把整個hadoop關掉。

結語

在這篇介紹了透過Hadoop Streaming達到在hadoop用.Net core 2.0的console程式做MapReduce如何。

這篇也改成使用docker來做hadoop測試而不是用一直以來建立的VM。用docker和VM比較會發現到docker其實做這種事情非常方便,如果對docker不熟悉,可以考慮花點時間做些學習(之後我的部落格也會有個系列介紹docker使用,有興趣的話請持續關注)。

在這個系列的後面,之前建立的VM還會用到 - 用來和R做結合。所以如果對後面操作有興趣,VM還是先保留。

在這個系列的Hadoop介紹也到了一個尾聲,在下一篇([11]Hadoop總結(上篇)–Ecosystem介紹)將會對目前hadoop有介紹的部分做一個總結,介紹hadoop的ecosystem,和還有什麼部分是應該繼續關注下去。


如果文章對您有幫助,就請我喝杯飲料吧
街口支付QR Code
街口支付QR Code
台灣 Pay QR Code
台灣 Pay QR Code
Line Pay 一卡通 QR Code
Line Pay 一卡通 QR Code
街口支付QR Code
支付寶QR Code
街口支付QR Code
微信支付QR Code
comments powered by Disqus