编辑推荐: |
本文来自于bradhedlund.com,这篇文章将会逐步介绍
Hadoop 集群的实现原理以及Hadoop集群的拓扑结构。
|
|
文章让大家通过拓扑图的形式直观的了解 Hadoop 集群是如何搭建、运行以及各个节点之间如何相互调用、每个节点是如何工作以及各个节点的作用是什么。明白这一点将会对学习
Hadoop 有很大的帮助。首先,我们开始了解 Hadoop 的基础知识,以及 Hadoop 集群的工作原理。
在Hadoop部署中,有三种服务器角色,他们分别是客户端、Masters节点以及Slave 节点。Master
节点,Masters 节点又称主节点,主节点负责监控两个核心功能:大数据存储(HDFS)以及数据并行计算(Map
Reduce)。其中,Name Node 负责监控以及协调数据存储(HDFS)的工作,Job Tracker
则负责监督以及协调 Map Reduce 的并行计算。 而Slave 节点则负责具体的工作以及数据存储。每个
Slave 运行一个 Data Node 和一个 Task Tracker 守护进程。这两个守护进程负责与
Master 节点通信。Task Tracker 守护进程与 Job Tracker 相互作用,而
Data Node 守护进程则与 Name Node 相互作用。
所有的集群配置都会存在于客户端服务器,但是客户端服务器不属于 Master 以及 Salve,客户端服务器仅仅负责提交计算任务给
Hadoop 集群,并当 Hadoop 集群完成任务后,客户端服务器来拿走计算结果。在一个较小的集群中(40个节点左右),可能一台服务器会扮演多个角色,例如通常我们会将
Name Node 与 Job Tracker安置在同一台服务器上。(由于 Name Node对内存开销非常大,因此不赞成将
Name Node 与 Secondary Name Node 安置在同一台机器上)。而在一个大型的集群中,请无论如何要保证这三者分属于不同的机器。
在真实的集群环境中,Hadoop 最好运行在 Linux 服务器上。当然,Hadoop 也可以运行在虚拟机中,但是,这仅仅是用来学习的一种方法,而不能将其用在生产环境中。
上图是一个典型的 Hadoop 集群架构。这张图中,Hadoop 集群以机架为单位存在(而不是刀片机),而每个机架顶部都会有一个交换机通过千兆网与外部关联,如果你的服务器比较给力,请确保带宽足够数据的传输以免带宽影响运算(例如万兆以太网)。我们通过上行链路将所有的机架链接在一起形成一个集群。在一个机架中,有些服务器作为
Master 节点,例如 Name Node等等,而更多的则是 Slave Node。
到这里,我们没有讨论更加详细的设计方案,我们暂且放在一边,下面开始看看程序是如何运行的。
从集群中加载数据(HDFS writes)
分析数据(Map Reduce)
在集群中为结果排序(HDFS writes)
从集群运算中读取结果(HDFS reads)
应用场景:
每天都有海量的邮件被客服收到,但是我怎么才能知道这些邮件中究竟有多少邮件包含了“退款”这个词?
Hadoop 的存在价值是什么?Hadoop 解决的是哪些问题?简单来讲,大型企业和政府都可能会包含有大量数据,
(我们可以看做是一块巨大的豆腐)例如马路卡口监控视频拍摄的机动车号牌,我们如果要对如此海量的数据进行复杂的分析,还要非常快速的得到结果,如果使用一台计算机,根本无法胜任这个工作。如果能将这个庞然大物分割成许多小的数据块,并将其分发给许许多多的服务器来协同计算,那么这个效率自然是很快的,所以,Hadoop
的存在价值就体现在这里。
例如上面那个邮件的例子,经过日积月累,我们的服务器存有大量的邮件,我们可以将这些邮件打包成文本发送给Hadoop
集群,只需要编写一个简单的计算单词量的函数,并提交给集群,集群通过相互协调,在短时间内计算完毕之后返回一个结果。我就可以得到我想要的结果了。
Hadoop 集群有了数据之后,便开始工作。我们在此的目的是大量数据的快速并行处理,为了实现这个目标,我们应当利用尽可能多的机器,为此,客户端需要将数据分拣分成较小的块,然后将这些快在集群中不同的机器上并行计算。但是,这些服务器中的某些服务器可能会出现故障,因此,应当将每个数据块做几次拷贝,以确保数据不会被丢失。默认的拷贝次数是3次,但是我们可以通过
hdfs-site.xml 配置文件中 dfs.replication 这个参数来控制拷贝次数。
客户端将 File.txt 切割成三块,并与Name Node协调,Name Node告知客户端将这些数据分发到哪些
Data Node 上,收到数据块的 Data Node 将会对收到的数据块做几次复制分发给其他的
Data Node 让其他的 Data Node 也来计算同样的数据(确保数据完整性)。此时,Name
Node 的作用仅仅是负责管理数据,例如:哪些数据块正在哪个Data Node上计算,以及这些数据将会运行到哪里。(文件系统的元数据信息)
Hadoop 有“机架意识”的概念,作为 Hadoop 的管理者,你可以手动的在你的集群中为每一个Slave
Data Node定义机架号。你可能会问,我为什么要做这个工作?这里有两个关键点:数据丢失防护以及网络性能。
记住,每一个数据块将会被复制到多态服务器上以确保数据不会因某个服务器的宕机而造成数据丢失,但是如果不幸的是,所有的拷贝都存放于一台机架上,而这个机架由于种种原因造成了整个机架与外部断开连接或整体宕机。例如一个严重的错误:交换机损坏。因此,为了避免这种情况的发生,需要有人知道每一个
Data Node 在整个网络拓扑图中的位置,并智能的将数据分配到不同的机架中。这个就是 Name
Node 的作用所在。
还有一种假设,即两台机器在同一个机架和分属于不同的机架比起来有更好的带宽和更低的延迟。机架交换机上行链路带宽通常比下行带宽更少,此外,在机架内延迟通常比机架外的延迟要底。如果
Hadoop 有了相同机架优化的意识(提高网络性能),同时能够保护数据,这不是很酷吗?
这里的客户端已经准备好将 FILE.txt 分成三块添加到集群中。客户端会先告诉 Name Node
它将要把数据写入到集群中,从 Name Node处得到允许后,并受到 Name Node 为其分配的
Data Node 清单,Name Node分配 Data Node 的时候会有一个智能决策的步骤,以默认的拷贝次数来讲(3次),
Name Node 会将其中两个副本放在同一个机架中,而剩下的一个副本会放在另外一个机架中,并将分配结果告诉给客户端,客户端将会遵循这个分配结果将数据分配给三个
Data Node。
当客户端接收到 Name Node 给出的任务分配清单后,开始将数据传输给 Data Node,例如:Client
选择 Block A 打开 TCP 50010 端口,告诉 Data Node 1 说:“嘿,给你一个数据块
Block A,然后你去确保 Data Node 5 也准备好了,并让它问问 Data Node 6
是不是也准备好了”。如此,Data Node们会通过TCP 50010 端口原路返回并逐层告知自己准备好了,最终让客户端得知清单上所列出的
Data Node 都准备好了。当客户端得知都准备好之后,开始准备写数据块到集群中。
数据块被第一个 Data Node 接收后,会将其复制给下一个 Data Node 以此类推(复制次数由
dfs.replication 参数控制)。
此处我们可以看到,Data Node 1 之所以在不同的机架上,是为了避免数据丢失,而 Data
Node 5 和 Data Node 6 存在于同一个机架是为了保证网络性能和低延迟。直到 Block
A 被成功的写入到第三个节点,Block B 才会开始继续写入。
当所有的 Data Node 已经成功的接收到了数据块,它们将会报告给 Name Node,同时也会告知客户端一切准备就绪并关闭回话,此时客户端也会返回一个成功的信息给
Name Node。Name Node 开始更新元数据信息将 Block A 在 File.txt
中的位置记录下来。
然后重复以上操作,直到剩下的两个数据块 Block B和 Block C 也分别写入到其他的 Data
Node 中。
通过以上步骤我们可以得知,如果我们有一个1TB的数据要做分析,那么我们所占用的网络流量和磁盘空间将如下:
使用流量= 磁盘空间 = dfs.replication*数据大小
例如我们默认的设置是拷贝三次,那么我们就需要消耗3TB的网络流量和3TB的磁盘空间。
这样,就如我们预期的那样,将一个大的数据分割成无数小的数据提交给多个Data Node 进行并行计算。在这里,我们可以通过横向扩展增加服务器的数量来提高计算性能,但同时,网络I/O
的吞吐也成为了计算性能瓶颈之一,因为如果横向扩展,会给网络吞吐带来巨大的压力,如何将 Hadoop
过渡到万兆以太网是即将到来的难题。(而且北京这房价,擦擦擦……)
还有一种方法则是通过提高单个 Data Node 的配置来提高计算性能。此为纵向扩展。但通常不认为这样做是一个明智的选择(除非您的机房费用真的很高)。
Name Node 在整个 HDFS 中处于关键位置,它保存了所有的文件系统的元数据信息用来监控每个
Data Node 的健康状况。只有 Name Node 知道数据从哪里来、将要被分配到哪里去,最后返回给谁。
Data Node 会每3秒钟一次通过 TCP 9000端口发送心跳给 Name Node。每10次心跳生成一个健康报告,心跳数据都会包含关于该Data
Node所拥有的数据块信息。该报告让 Name Node 知道不同的机架上不同的Data Node上存在的数据快的副本,并为此建立元数据信息。
Name Node的重要性不言而喻,没有它,客户端将不知道如何向HDFS写入数据和读取结果,就不可能执行
Map Reduce 工作,因此,Name Node 所在的服务器应当是一个比较牛逼的服务器(热插拔风扇、冗余网卡连接、双电源等)。
如果 Name Node 没有接收到 Data Node 发送过来的心跳,那么它将会假定该 Data
Node 已经死亡。因为有前面的心跳报告,因此 Name Node 知道该死亡的 Data Node
目前的工作内容以及进度,它将会将该 Data Node 所负责的内容分发给其他的 Data Node去完成。(同样根据机架意识来分发该任务)。
Secondary Name Node 在国内通常被称为辅助 Name Node 因为它并不是一个完整备份,
Secondary Name Node 的存在虽然是为了确保 Name Node 在宕机后能够接手其职责,但是它与
Name Node 之间的元数据交互不是实时的。默认为每隔一小时,Secondary Name Node
会主动请求 Name Node,并从 Name Node 中拿到文件系统的元数据信息(同步)。这个间隔可以通过配置项来设置。
因此,如果万一 Name Node 宕机,虽然 Secondary Name Node 能够接手参加工作,但是依然会造成部分的数据丢失。因此,如果数据非常重要,默认的一小时同步一次可能远远不足以保护数据的计算进度,我们可以缩短其同步时间来增加数据的安全性例如:每分钟同步一次。
当客户端打算从 HDFS 中取数据的时候,例如一个作业的结果,同样需要首先与 Name Node
打交道,的值想取的数据被存放在哪里,Name Node 同样会给客户端一个清单,然后客户端去 Name
Node 指定的某个 Data Node 中拿数据(通过TCP 50010 端口)。
客户端不会逐个 Data Node 去拿数据,而是由 Name Node 指定的那个 Data
Node 分别去其他的 Data Node 那里拿数据。好像客户端在说:“Name Node,告诉我数据都在哪儿?”,Name
Node 说“他们在 Data Node x、y、z,你去 Data Node x 拿数据吧”,客户端于是告诉
Data Node X,你把 y 和 z 的数据一起拿来并送到我这里来。
还有一种情况,其中一个 Data Node 的守护进程本身将需要读取HDFS的某些数据块,例如Data
Node 被要求来处理数据,但是他本地没有,因此它必须从另一个数据节点中解锁数据并通过网络传输到本地开始处理。
在这里,Name Node 同样会考虑“机架意识”,并通过机架意识来找到最近的 Data Node
并将数据传输过去。
现在,我们已经知道 File.txt 被分发到不同的 Data Node上,也知道了他们是如何运转的了,现在我们就开始看看
Hadoop 是具体如何使用 Map Reduce 框架来进行快速运算的吧。
第一步是 Map 阶段,在我们的例子中,我们要求我们的 Hadoop 集群计算出邮件中“Refund”单词的数量。
在一开始,客户端会提交一个 Map Reduce 作业,例如:“在 File.txt 中有多少个
Refund?”当然,这句话是翻译自 Java 语言,通过 Java 编写好一个 Map Reduce
作业并提交作业给 Job Tracker。Job Tracker 与 Name Node 协调得到哪些
Data Node 包含 File.txt 的数据块。 然后 Job Tracker 开始在这些 Data
Node 上激活 Task Tracker。 Task Tracker 开始 Map 作业并监视任务进度。Task
Tracker同时也提供心跳和任务状态给 Job Tracker。
当所有的 Map 任务完成后,每个节点都会存放一个结果在其本地物理磁盘上,这被称为“中间数据”。接下来,就是将这些中间数据通过网络发送给一个执行
Reduce 任务的节点。
Job Tracker 总会尽量挑选处理 Map 任务的节点来处理 Reduce 任务,因为 Map
处理的数据就在本地,不需要通过网络来传输,但是,Job Tracker 并不保证一定会挑选本地节点,因为可能
Map 任务较多,没有资源来运行 Reduce 任务,这样,就会挑选其他的节点来完成 Reduce
任务。
同样,Job Tracker 会根据机架意识来挑选同一机架内的其他节点来完成 Reduce 任务。
第二步就是 Reduce,当 Map 生成了中间数据后,现在我们需要将所有的这些中间数据汇总计算生成最终的数据。
Job Tracker 可能会挑选集群中任意一个节点来作为 Reduce 任务的处理服务器,此时可能会一次性有大量的数据涌向
Reduce 任务所在的节点服务器,这种情况通常被称为“Incast”或“fan-in”。这需要牛逼点的交换机以及内部流量管理和足够的缓冲区(不要太大,也不要太小)。缓冲区的大小最终可能会造成不必要的附带损害(流量相关)。但这是另外一个话题。
现在 Reduce 已经收集到了所有从 Map 计算得到的中间数据,并可以开始最后阶段的计算,在本文的例子中,我们仅仅是简单的将各个结果相加即可得到“Refund”的数量。然后将结果写入到
Results.txt 文件。客户端可以从HDFS中读取Results.txt 文件,并将工作视为完成。
这个例子并没有造成大量的中间数据和流量占用问题,但是实际生产环境可能会造成大量的中间数据和带宽瓶颈。这就是
Map Reduce 作业中代码如何计算的一门学问,如果用最优化的代码来完成作业是需要我们来决定的。
如果在 Hadoop 集群运算过程中添加了新的机架,那么数据的负载均衡就成为了一个问题所在,如何让新增加的机架迅速的融入到计算环境中来成为了一个很迫切的问题。
是的,Hadoop的确包含一个名为 Balancer 的工具。Balancer作业与节点,并试图提供计算平衡。但是默认的
Balancer 可利用的网络带宽是非常低的 1mb/s。不过此设置可以通过hdfs-site.xml
中的 dfs.balance.bandwidthPerSec 参数来设置。
Balancer 可以很好的照顾你的集群。鉴于 Balancer 的默认带宽设置,他可能需要较长的时间来完成均衡的操作,可能几天,也可能几周。 |