编辑推荐: |
本文先讲MapReduce
1.x的框架。再讲MapReduce 1.x升级改进后MapReduce 2.x/Yarn的框架。目前主要是用MapReduce
2.x/Yarn的框架。
本文来自codertw.com,由火龙果软件Anna编辑、推荐。 |
|
MapReduce 1.x
MapReduce 1.x重点概念
JobClient
用戶編寫的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業運行狀態。在Hadoop內部用“作業”
(Job)表示MapReduce程序。每一個Job都會在用戶端通過Client類將應用程序以及參數配置Configuration打包成Jar文件存儲在HDFS,並把路徑提交到JobTracker,然後由JobTracker創建每一個Task(即MapTask和ReduceTask),將它們分發到各個TaskTracker服務中去執行。
JobClient提交Job的詳細流程主要如下:
JobClient在獲取了JobTracker為Job分配的id之後,會在JobTracker的系統目錄(HDFS)下為該Job創建一個單獨的目錄,目錄的名字即是Job的id,該目錄下會包含文件job.xml、job.jar、job.split等,其中,job.xml文件記錄了Job的詳細配置信息,job.jar保存了用戶定義的關於job的map、reduce操縱,job.split保存了job任務的切分信息。
JobTracker
JobTracker 主要負責資源監控和作業調度。JobTracker 監控所有 TaskTracker
與作業Job的健康狀況,一旦發現失敗情況後,其會將相應的任務轉移到其他節點;同時,JobTracker
會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閒時,選擇合適的任務使用這些資源。在Hadoop
中,任務調度器是一個可插拔的模塊,用戶可以根據自己的需要設計相應的調度器。
以下引用 www.aboutyun.com/thread-7778…
JobTracker為作業的提交做了兩件事:一.為作業生成一個Job;二.接受該作業。
我們都知道,客戶端的JobClient把作業的所有相關信息都保存到了JobTracker的系統目錄下(當然是HDFS了),這樣做的一個最大的好處就是客戶端幹了它所能幹的事情同時也減少了服務器端JobTracker的負載。下面就來看看JobTracker是如何來完成客戶端作業的提交的吧!哦。對了,在這裡我不得不提的是客戶端的JobClient向JobTracker正式提交作業時直傳給了它一個改作業的JobId,這是因為與Job相關的所有信息已經存在於JobTracker的系統目錄下,JobTracker只要根據JobId就能得到這個Job目錄。
對於上面的Job的提交處理流程,我將簡單的介紹以下幾個過程:
創建Job的JobInProgress
JobInProgress對象詳細的記錄了Job的配置信息,以及它的執行情況,確切的來說應該是Job被分解的map、reduce任務。在JobInProgress對象的創建過程中,它主要乾了兩件事,一是把Job的job.xml、job.jar文件從Job目錄copy到JobTracker的本地文件系統(job.xml->/jobTracker/jobid.xml,job.jar->/jobTracker/jobid.jar);二是創建JobStatus和Job的mapTask、reduceTask存隊列來跟蹤Job的狀態信息。
檢查客戶端是否有權限提交Job
JobTracker驗證客戶端是否有權限提交Job實際上是交給QueueManager來處理的。
檢查當前mapreduce集群能夠滿足Job的內存需求
客戶端提交作業之前,會根據實際的應用情況配置作業任務的內存需求,同時JobTracker為了提高作業的吞吐量會限製作業任務的內存需求,所以在Job的提交時,JobTracker需要檢查Job的內存需求是否滿足JobTracker的設置。
上面流程已經完畢,可以總結為下圖:
TaskTracker
TaskTracker會週期性地通過心跳機制將本節點上資源的使用情況和任務的運行進度彙報給JobTracker,同時接收JobTracker發送過來的命令並執行相應的操作(如啟動新任務、殺死
任務等)。TaskTracker 使用“slot”等量劃分本節點上的資源量。 “slot”代表計算資源(CPU、
內存等)。一個 Task 獲取到一個slot 後才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閒slot分配給Task使用。slot分為Map
slot和Reduce slot 兩種,分別供Map Task和Reduce Task使用。TaskTracker通過slot數目(可配置參數)限定Task的併發度。
這裡可能有人會混淆JobTracker、TaskTracker和Hadoop學習(一)——hdfs架構中所講的的DataNode、NameNode。其實JobTracker對應於NameNode,TaskTracker對應於DataNode。DataNode和NameNode是針對數據存放來而言的,JobTracker和TaskTracker是對於MapReduce執行而言的。
MapTask和ReduceTask
Task 分為 Map Task 和 Reduce Task 兩種,均由TaskTracker啟動。從Hadoop學習(一)——hdfs架構中我們知道,HDFS以固定大小的block
為基本單位存儲數據,而對於MapReduce的輸入而言,其處理單位是split。 split 與 block
的對應關係默認是1:1。split 是一個邏輯概念,它只包含一些元數據信息,比如數據起始位置、數據長度、數據所在節點等。它的劃分方法完全由用戶自己決定。但需要注意的是,split的多少決定了Map
Task的數目,因為每個split會交由一個Map Task處理。split會在後面MapReduce的執行過程中詳細講。
MapReduce 1.x工作流程
www.aboutyun.com/thread-1549…
整個MapReduce作業的工作過程,如下所示:
作業的提交
JobClient的submitJob()方法實現的作業提交過程,如下所示:
通過JobTracker的getNewJobId()請求一個新的作業ID;(2)
檢查作業的輸出說明(比如沒有指定輸出目錄或輸出目錄已經存在,就拋出異常);
計算作業的輸入分片(當分片無法計算時,比如輸入路徑不存在等原因,就拋出異常);
將運行作業所需的資源(比如作業Jar文件,配置文件,計算所得的輸入分片等)複製到一個以作業ID命名的目錄中。(集群中有多個副本可供TaskTracker訪問)(3)
通過調用JobTracker的submitJob()方法告知作業準備執行。(4)
作業的初始化
JobTracker接收到對其submitJob()方法的調用後,就會把這個調用放入一個內部隊列中,交由作業調度器(比如先進先出調度器,容量調度器,公平調度器等)進行調度;(5)
初始化主要是創建一個表示正在運行作業的對象——封裝任務和記錄信息,以便跟蹤任務的狀態和進程;(5)
為了創建任務運行列表,作業調度器首先從HDFS中獲取JobClient已計算好的輸入分片信息(6)。然後為每個分片創建一個MapTask,並且創建ReduceTask。(Task在此時被指定ID,請區分清楚Job的ID和Task的ID)。
任務的分配
TaskTracker定期通過“心跳”與JobTracker進行通信,主要是告知JobTracker自身是否還存活,以及是否已經準備好運行新的任務等;(7)
JobTracker在為TaskTracker選擇任務之前,必須先通過作業調度器選定任務所在的作業;
對於MapTask和ReduceTask,TaskTracker有固定數量的任務槽(準確數量由TaskTracker核的數量和內存大小來決定)。JobTracker會先將TaskTracker的MapTask填滿,然後分配ReduceTask到TaskTracker;
對於MapTrask,JobTracker通過會選取一個距離其輸入分片文件最近的TaskTracker。對於ReduceTask,因為無法考慮數據的本地化,所以也沒有什麼標準來選擇哪個TaskTracker。
任務的執行
TaskTracker分配到一個任務後,通過從HDFS把作業的Jar文件複製到TaskTracker所在的文件系統(Jar本地化用來啟動JVM),同時TaskTracker將應用程序所需要的全部文件從分佈式緩存複製到本地磁盤;(8)
TaskTracker為任務新建一個本地工作目錄,並把Jar文件中的內容解壓到這個文件夾中;
TaskTracker啟動一個新的JVM(9)來運行每個Task(包括MapTask和ReduceTask),這樣Client的MapReduce就不會影響TaskTracker守護進程(比如,導致崩潰或掛起等);
子進程通過umbilical接口與父進程進行通信,Task的子進程每隔幾秒便告知父進程它的進度,直到任務完成。
進程和狀態的更新
一個作業和它的每個任務都有一個狀態信息,包括作業或任務的運行狀態,Map和Reduce的進度,計數器值,狀態消息或描述(可以由用戶代碼來設置)。這些狀態信息在作業期間不斷改變,它們是如何與Client通信的呢?
任務在運行時,對其進度(即任務完成的百分比)保持追蹤。對於MapTask,任務進度是已處理輸入所佔的比例。對於ReduceTask,情況稍微有點複雜,但系統仍然會估計已處理Reduce輸入的比例;
這些消息通過一定的時間間隔由Child JVM—>TaskTracker—>JobTracker匯聚。JobTracker將產生一個表明所有運行作業及其任務狀態的全局視圖。可以通過Web
UI查看。同時JobClient通過每秒查詢JobTracker來獲得最新狀態,並且輸出到控制檯上。
作業的完成
當JobTracker收到作業最後一個任務已完成的通知後,便把作業的狀態設置為”成功”。然後,在JobClient查詢狀態時,便知道作業已成功完成,於是JobClient打印一條消息告知用戶,最後從runJob()方法返回。
MapReduce 1.x的缺點
隨著分佈式系統集群的規模和其工作負荷的增長,原框架的問題逐漸浮出水面,主要的問題集中如下:
JobTracker 是 Map-reduce 的集中處理點,存在單點故障。
JobTracker 完成了太多的任務,造成了過多的資源消耗,當 map-reduce job 非常多的時候,會造成很大的內存開銷,潛在來說,也增加了
JobTracker fail 的風險,這也是業界普遍總結出老 Hadoop 的 Map-Reduce
只能支持 4000 節點主機的上限。
在 TaskTracker 端,以 map/reduce task 的數目作為資源的表示過於簡單,沒有考慮到
cpu/ 內存的佔用情況,如果兩個大內存消耗的 task 被調度到了一塊,很容易出現 OOM。
在 TaskTracker 端,把資源強制劃分為 map task slot 和 reduce task
slot, 如果當系統中只有 map task 或者只有 reduce task 的時候,會造成資源的浪。
源代碼層面分析的時候,會發現代碼非常的難讀,常常因為一個 class 做了太多的事情,代碼量達 3000
多行,,造成 class 的任務不清晰,增加 bug 修復和版本維護的難度。
從操作的角度來看,現在的 Hadoop MapReduce 框架在有任何重要的或者不重要的變化 (
例如 bug 修復,性能提升和特性化 ) 時,都會強制進行系統級別的升級更新。更糟的是,它不管用戶的喜好,強制讓分佈式集群系統的每一個用戶端同時更新。這些更新會讓用戶為了驗證他們之前的應用程序是不是適用新的
Hadoop 版本而浪費大量時間。
MapReduce 2.x/Yarn框架
從業界使用分佈式系統的變化趨勢和 hadoop 框架的長遠發展來看,MapReduce 的 JobTracker/TaskTracker
機制需要大規模的調整來修復它在可擴展性,內存消耗,線程模型,可靠性和性能上的缺陷。在過去的幾年中,hadoop
開發團隊做了一些 bug 的修復,但是最近這些修復的成本越來越高,這表明對原框架做出改變的難度越來越大。
YARN是Hadoop 2.0中的資源管理系統,它的基本設計思想是將MRv1中的JobTracker拆分成了兩個獨立的服務:一個全局的資源管理器ResourceManager和每個應用程序特有的ApplicationMaster。其中ResourceManager負責整個系統的資源管理和分配,而ApplicationMaster負責單個應用程序的管理。ApplicationMaster
承擔了以前的 TaskTracker 的一些角色,ResourceManager 承擔了 JobTracker
的角色。
YARN是一個資源管理、任務調度的框架,主要包含三大模塊:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。其中,ResourceManager負責所有資源的監控、分配和管理;ApplicationMaster負責每一個具體應用程序的調度和協調;NodeManager負責每一個節點的維護。對於所有的applications,RM擁有絕對的控制權和對資源的分配權。而每個AM則會和RM協商資源,同時和NodeManager通信來執行和監控task。
幾個模塊之間的關係如圖所示。
MapReduce 2.x重點概念
ResourceManager(RM)
RM是一個全局的資源管理器,負責整個系統的資源管理和分配。它主要由兩個組件構成:調度器(Scheduler)和應用程序管理器(Applications
Manager,AsM)。
調度器
調度器根據容量、隊列等限制條件(如每個隊列分配一定的資源,最多執行一定數量的作業等),將系統中的資源分配給各個正在運行的應用程序。
需要注意的是,該調度器是一個“純調度器”,它不再從事任何與具體應用程序相關的工作,比如不負責監控或者跟蹤應用的執行狀態等,也不負責重新啟動因應用執行失敗或者硬件故障而產生的失敗任務,這些均交由應用程序相關的ApplicationMaster完成。調度器僅根據各個應用程序的資源需求進行資源分配,而資源分配單位用一個抽象概念“資源容器”(Resource
Container,簡稱Container)表示,Container是一個動態資源分配單位,它將內存、CPU、磁盤、網絡等資源封裝在一起,從而限定每個任務使用的資源量。此外,該調度器是一個可插拔的組件,用戶可根據自己的需要設計新的調度器,YARN提供了多種直接可用的調度器,比如Fair
Scheduler和Capacity Scheduler等。
應用程序管理器
應用程序管理器負責管理整個系統中所有應用程序,包括應用程序提交、與調度器協商資源以啟動ApplicationMaster、監控ApplicationMaster運行狀態並在失敗時重新啟動它等。
NodeManager(NM)
NM是每個節點上的資源和任務管理器,一方面,它會定時地向RM彙報本節點上的資源使用情況和各個Container的運行狀態;另一方面,它接收並處理來自AM的Container啟動/停止等各種請求。
ApplicationMaster(AM)
用戶提交的應用程序均包含一個AM,負責為應用程序申請資源並分配給內部的任務,應用的監控,跟蹤應用執行狀態,重啟失敗任務等。ApplicationMaster是應用框架,它負責向ResourceManager協調資源,並且與NodeManager協同工作完成Task的執行和監控。MapReduce就是原生支持的一種框架,可以在YARN上運行Mapreduce作業。有很多分佈式應用都開發了對應的應用程序框架,用於在YARN上運行任務,例如Spark,Storm等。如果需要,我們也可以自己寫一個符合規範的YARN
application。
Container
Container 是 YARN 中的資源抽象,它封裝了某個節點上的多維度資源,如內存、CPU、磁盤、網絡等,當AM向RM申請資源時,RM為AM返回的資源便是用Container表示的。YARN會為每個任務分配一個Container,且該任務只能使用該Container中描述的資源。每個Container可以根據需要運行ApplicationMaster、Map、Reduce或者任意的程序。
YARN應用工作流程
用戶向YARN中提交應用程序,其中包括AM程序、啟動AM的命令、命令參數、用戶程序等;事實上,需要準確描述運行ApplicationMaster的unix進程的所有信息。提交工作通常由YarnClient來完成。
RM為該應用程序分配第一個Container,並與對應的NM通信,要求它在這個Container中啟動AM;
AM首先向RM註冊,這樣用戶可以直接通過RM査看應用程序的運行狀態,運行狀態通過 AMRMClientAsync.CallbackHandler的getProgress()
方法來傳遞給RM。 然後它將為各個任務申請資源,並監控它的運行狀態,直到運行結束,即重複步驟4?7;
AM採用輪詢的方式通過RPC協議向RM申請和領取資源;資源的協調通過 AMRMClientAsync異步完成,相應的處理方法封裝在AMRMClientAsync.CallbackHandler中。
—旦AM申請到資源後,便與對應的NM通信,要求它啟動任務;通常需要指定一個ContainerLaunchContext,提供Container啟動時需要的信息。
NM為任務設置好運行環境(包括環境變量、JAR包、二進制程序等)後,將任務啟動命令寫到一個腳本中,並通過運行該腳本啟動任務;
各個任務通過某個RPC協議向AM彙報自己的狀態和進度,以讓AM隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;ApplicationMaster與NM的通信通過NMClientAsync
object來完成,容器的所有事件通過NMClientAsync.CallbackHandler來處理。例如啟動、狀態更新、停止等。
應用程序運行完成後,AM向RM註銷並關閉自己。
|