编辑推荐: |
本文介绍为什么使用Kafka,如何消息传递保证的相关内容,传递保证语义的三个级别,通过一个示例进行分析ISR集合,HW和LEO是如何协调工作的。
本文来自于简书,由火龙果软件Alice编辑、推荐。 |
|
1 Kafka介绍
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源
项目。
2 为什么使用Kafka
2.1 解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2.2 冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
2.3 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
2.4 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
2.5 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
2.6 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka
保证一个 Partition 内的消息的有序性)
2.7 缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
2.8 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
3 Kafka架构
3.1 Broker
kafka 集群中包含的服务器。一个单独的Kafka server就是一个Broker。Broker的主要工作就是接受生产者发过来的消息,分配offset,之后保存到磁盘中,同时,接收消费者、其他Borker的请求,根据请求类型进行相应处理并返回响应。多个Broker可以做成一个Cluster对外提供服务,每个Cluster当中会选举出一个Broker来担任Controller,Controller是Kafka集群的指挥中心,而其他Broker则听从Controller指挥实现相应的功能。Controller负责管理分区的状态、管理每个分区的副本的状态、监听Zookeeper中数据的变化等工作。Controller也是一主多从的实现,所有的Broker都会监听Controller
Leader的状态,当Controller Leader出现故障时则重新选举新的Controller
Leader。
3.2 Producer
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个
partition。其路由机制为:
1 指定了 patition,则直接使用。
2 未指定 patition 但指定 key,通过对 key 的
value 进行hash 选出一个 patition。
3 patition 和 key 都未指定,使用轮询选出一个 patition。
流程说明:
1 producer 先从 zookeeper 的 "/brokers/.../state"
节点找到该 partition 的 leader。
2 producer 将消息发送给该 leader。
3 leader 将消息写入本地 log。
4 followers 从 leader pull 消息,写入本地
log 后 leader 发送 ACK。
5 leader 收到所有 ISR 中的 replica 的 ACK
后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer
发送 ACK。
下面来介绍消息传递的保证(Delivery guarantee semantic)的相关内容,传递保证语义有以下三个级别。
1 At most once:消息可能会丢,但绝不会重复传递。
2 At least once:消息绝不会丢,但可能会重复传递。
3 Exactly once:每条消息都只被传递一次。
当 producer 向 broker 发送消息时,一旦这条消息被
commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给
broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然
Kafka 无法确定网络故障期间发生了什么。为了实现Exactly once语义,这里提供两种可选方案:
1 每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作的消息重传还是继续发送。
2 为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析的方式进行重传,由消费者进行去重。
3.3 Topic&Log
每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向
Topic的。每个Topic可以划分成多个分区,同一个Topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配到一个offset,它是消息在此分区中的唯一编号,Kafka通过offset保证消息的分区内的顺序,offset的顺序性不夸分区,即Kafka只保证在同一个分区内的消息是有序的。
同一Topic的不同分区会分配在不同的Broker上。分区是Kafka水平扩展性的基础,我们可以通过增加服务器并在其上分配Partition的方式来增加Kafka的并行处理能力。
分区在逻辑上对应着一个Log,当生产者将消息写入分区时,实际上是写入到了分区对应的Log中。Log是一个逻辑概念,可以对应到磁盘上的一个文件夹。Log有多个Segment组成,每个Segment对应一个日志文件和索引文件。在面对海量数据时,为避免Segment出现超大文件,每个日志文件的大小是有限制的,当超出限制后会创建新的Segment继续对外提供服务。这里要注意,因为Kafka采用顺序IO,所以只向最新的Segment追加数据。为了权衡文件大小、索引速度、占用内存大小等多方面因素,索引文件采用稀疏索引的方式,大小并不会很大,在运行时会将其内容映射到内存,提高索引速度。
3.4 Partition
每一个Topic都可以划分成多个Partition(每一个Topic都至少有一个Partition),不同的Partition会分配在不同的Broker上以对Kafka进行水平扩展从而增加Kafka的并行处理能力。同一个Topic下的不同Partition包含的消息是不同的。每一个消息在被添加到Partition的时候,都会被分配一个offset,他是消息在此分区中的唯一编号,此外,Kafka通过offset保证消息在Partition中的顺序,offset的顺序性不跨Partition,也就是说在Kafka的同一个Partition中的消息是有序的,不同Partition的消息可能不是有序的。
3.5 Consumer
消费者(Consumer)的主要工作是从Topic中拉取消息,并对消息进行消费。某个消息消费到Partition的哪个位置(offset)的相关信息,是Consumer自己维护的。
这样设计非常巧妙。避免了Kafka Server端维护消费者消费位置的开销,尤其是在消费数量较多的情况下。另一方面,如果是由Kafka
Server端管理每个Consumer消费状态,一旦Kafka Server端出现延时或是消费状态丢失,将会影响大量的Consumer。同时,这一设计也提高了Consumer的灵活性,Consumer可以按照自己需要的顺序和模式拉取消息进行消费。例如:Consumer可以通过修改其消费者的位置实现针对某些特殊key的消息进行反复消费,或是跳过某些消息的需求。
3.6 Consumer group
high-level consumer API 中,每个 consumer
都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer
消费,但可以被多个 consumer group 消费。
3.7 Replica
Kafka对消息进行了冗余备份,每个Partition分区都可以有多个副本,每一个副本中包含的消息是相同的(但不保证同一时刻下完全相同)。每个分区至少有一个副本,当分区只有一个副本的时候,就只有Leader副本,没有
Follower。在每个副本集合中,都会选举出一个副本作为Leader副本,Kafka在不同的场景中会采用不同的选举策略。Kafka中所有的读写请求都由选举出的Leader副本处理,其他的都作为Follower副本,Follower副本仅仅是从Leader副本中把数据拉取到本地之后,同步更新到自己的Log中。
3.8 ISR
ISR(In-Sync-Replica)集合表示的是目前可用(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。ISR集合中的副本必须满足下面两个条件:
(1)副本所在节点必须维持着与ZooKeeper的链接。
(2)副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超出指定的阈值。
每个分区中的Leader都会维护此分区的ISR集合,写请求首先由Leader副本处理,之后Follower副本都会从Leader上拉取写入的消息,这个过程会有一定的延迟,导致Follower副本中保存的消息略少于Leader副本,只要未超出阈值都是可以容忍的。如果一个Follower副本出现异常,比如:宕机,发生长时间GC而导致Kafka僵死或是网络断开连接导致长时间没有拉取消息进行同步,就回违反上面两个条件,从而被Leader副本踢出ISR集合。当Follower副本从异常中恢复之后,会继续与Leader副本进行同步,当Follower副本追上(即最后一条消息的offset的差值小于指定阈值)Leader副本的时候,此Follower副本会被Leader副本重新加入到ISR中。
3.9 HW&LEO
HW(High Watermark)和LEO与ISR集合紧密相关。HW标记了一个特殊的offset,当消费者处理消息的时候,只能拉取到HW之前的消息,HW之后的消息对消费者来说是不可见的。与ISR集合类似,HW也是由Leader副本管理的。当ISR集合中全部的Follower副本都拉取HW指定消息进行同步后,Leader副本会递增HW的值。Kafka官方网站将HW之前的消息的状态称为commit,其含义是这些消息在多个副本中同时存在,即使Leader副本损坏,也不会出现数据丢失。
LEO(Log End Offset)是所有的副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向Leader副本追加消息的时候,Leader副本的LEO标记会递增;当Follower副本成功从Leader副本拉取消息并更新到本地的时候,Follower副本的LEO就会增加。
为了让读者更好的理解HW和LEO之间的关系,下面通过一个示例进行分析ISR集合,HW和LEO是如何协调工作的。
1 Poducer向此Partition推送消息。
2 Leader副本将消息追加到Log中,并递增其LEO。
3 Follower副本从Leader副本拉取消息并进行同步。
4 Follower副本将拉取到的消息更新到本地Log中,并递增其LEO。
5 当ISR集合中所有副本都完成了对offset消息的同步,Leader副本会递增HW。
3.10 ZooKeeper
kafka 通过 zookeeper 来存储集群的 meta 信息。
|