前言
Twitter每秒会产生亿级的事件数据。如何实时、高效地持久化存储和传递这些数据成为一个巨大的挑战。为此,Twitter设计部署了新一代的实时数据平台。新的实时数据平台由Apache
DistributedLog 来支撑。Apache DistributedLog是一个低延时(毫秒级)、高吞吐的分布式复制日志流系统。
DistributedLog已经在Twitter的线上运行了三四年,支持从分布式数据库、实时搜索引擎、跨机房数据同步到实时流计算等多种业务。DistributedLog每天为Twitter传递1.5
trillion条记录(合17.5 PB数据),并且于2016年5月开源,引起了社区的广泛关注。
为什么研发DistributedLog?
Twitter的实时数据在2012年的时候主要有两套系统来支持,一个是Kestrel(Twitter自研的分布式消息队列),另一个是Kafka
0.7。Kestrel作为低延时的消息队列,主要用在像Tweets、Timeline、Fanout等在线核心服务,以及Twitter的数据库中间件(Gizzard)中;
Kafka主要用在离线的日志、事件的收集和分析,与Twitter Storm结合使用于实时流计算。除了Kestrel和Kafka之外,还有一些系统使用MySQL作为Append-Only的数据表来进行消息的投递。
随着Twitter规模的增加,Kestrel和Kafka带来了一些问题。首先,这两个系统都使用简单的文件存储消息队列或者Topic中的数据,其性能严重依赖于文件系统的Page
Cache。随着队列或者Topic数量的增加,或者在消费者落后于生产者的场景下,随机I/O以及Page
Cache的换入换出会严重影响整个系统的性能,从而将整个集群带入一个很难恢复的状态。
其次,这两个系统都不支持严格意义上的持久化(也就是,数据不落盘写)和多副本复制,存在丢数据的可能性。最后,多套系统也带来了相对比较繁重的维护代价。
也就在那时,我们开始使用Apache BookKeeper来构建复制式日志来实现Mahattan(Twitter的分布式数据库)的强一致性操作。Apache
BookKeeper是一个针对于高吞吐低延时设计的日志分块存储,它的多副本、强一致性和低延时等特性很好地满足了Manhattan的需求。
我们在此基础上,构建了Apache DistributedLog。随着不断的演化,Apache
DistributedLog逐渐演变成Twitter实时数据的基础架构,覆盖了数据库事务日志、实时消息发布订阅(PubSub)系统、实时流计算(Heron)和跨数据中心数据复制等多个应用。
项目开发中的技术挑战和遇到的坑
如何在保证强一致性和严格持久化(落盘写)的同时还能做到低延时和高吞吐,这是我们在开发Apache
DistributedLog的时候遇到的最大挑战,也是相对其他同类系统做得比较好的地方。好的I/O分离是在保证持久化的同时做到低延时的关键。
此外,像多租户配额管理、Speculative Read、读写分离、计算和存储扩展分离、跨交换机以及跨机房数据放置策略都是很有趣的设计和实现。感兴趣的同学可以来Apache
DistributedLog的邮件列表来讨论:
http://distributedlog.incubator.apache.org/community/#mailing-lists
当然,我们也趟过一些坑。
比如,在Apache DistributedLog中,我们尝试将服务和存储进行分离,使用Apache
Mesos/Aurora来运行和管理DistributedLog的读、写代理。在早些时候,Mesos的网络隔离不是很理想,我们的服务经常被同在一个物理机上的其他服务影响,丢包、网卡被打爆等严重影响服务的时延。我们需要通过改善资源的分配算法,改善流属主的错误检测来提高系统的稳定性。
除此之外,以下一些经验我们认为是做系统架构设计需要考虑的。
文件系统简单易用。但是如果要使用得漂亮,请考虑Page Cache的副作用。
I/O隔离和多租户是一个系统的首要考虑对象。否则,随着系统的规模增加,它们将变得不可控。
考虑服务和存储的分离,考虑读写的分离。
跨机房和跨机架本质上没有差别。合理的存储和复本抽象可以让一个系统更容易地进行跨机房的设计。
推荐使用场景
Apache DistributedLog在Twitter内部的应用场景,主要包括以下三类。
复制状态机(Replicated State Machine)- 这一类应用主要使用Apache
DistributedLog作为事务日志。
在Twitter内部,它包括 - Manhattan使用DL作为强一致性操作的事务日志,DeferredRPC(Twitter的持久化RPC系统)使用DL作为内存队列的操作日志,Graph
Store使用DL作为各种mutation的操作日志等。我们目前也在跟Heron团队合作,使用DL作为Heron的State
Store来实现Stateful Processing和Exactly-Once。
消息队列、消息发布订阅、流计算 - 这一类应用主要使用Apache DistributedLog来存储和传递消息。在Twitter内部,我们基于Apache
DistributedLog实现了一套类似于Kafka的分区式的消息发布订阅系统;同时也作为Heron的输入和输出,用于实时流计算的场景。
数据复制 - 这一类应用主要使用Apache DistributedLog来进行数据的复制。这个数据复制可能发现在本地机房,也可能是跨机房。
Apache DistributedLog适用于上述三种应用场景,尤其对于一致性、持久化(不丢数据)、可用性等要求比较高的应用场景。
DistributedLog的发展规划
在开发Apache DistributedLog的过程中,我们从未简单地把它定位为一个消息系统。复制日志(Replicated
Log)是复制状态机以及一致性算法的一个核心抽象。一个复制日志流把系统的状态变化从旧到新完整的记录下来。这里面既包含了过去的历史数据的存储,也包含了最新的实时数据的流动。我们主要是按照实时数据存储的定位来发展Apache
DistributedLog,包括以下几个方面 。
安全特性 - SSL、Authentication、Authorization等。
事务的支持 - 可以让用户更容易地构建基于状态复制机的数据服务,或者更容易做数据的复制。
多语言客户端的支持 - 可以让不同语言的应用可以访问和使用DistributedLog。
元数据存储插件化 - 可以使用除ZooKeeper以外的系统作为元数据存储,比如Etcd、Consul等。
层级存储(Tier Storage)- 在热数据放在内存或者Apache
BookKeeper中,随着日志分块变老,日志分块被无缝迁移到HDFS、S3、GCS等。
Stream Transformation - 比如filtering和projection。
想要了解更多的Apache DistributedLog的Project
Idea,可以访问这里:
https://cwiki.apache.org/confluence/display/DL/Project+Ideas
DistributedLog和Kafka技术对比
大家常常问起的问题之一就是Apache DistributedLog (incubating)与Apache
Kafka相对比,各有什么优劣。从技术上来讲DistributedLog并不是一个像Apache Kafka那么成熟的、有分区机制的广播/订阅系统。
DistributedLog是一个复制日志流存储,它用Apache BookKeeper来做日志分块(Log
Segment)的存储。它关注的是构建可靠的实时系统所需要的持久性、多副本和强一致性。可以把DistributedLog用于构建或尝试各种不同的消息通信模型,比如队列、广播/订阅等。
因为两者都是处理日志,数据模型也类似,所以这篇文章主要从技术角度讨论Apache
Kafka与DistributedLog的不同点。我们会尽量做到客观,但由于我们不是Apache Kafka的专家,因此我们可能会对Apache
Kafka存在误解。如果发现有错,也请大家直接指出。
首先,让我们简单地介绍一下Kafka和DistributedLog的概况。
Kafka是什么
Kafka是最初由Linkedin开源出来的一套分布式消息系统,现在由Apache软件基金会管理。这是一套基于分区的发布/订阅系统。Kafka中的关键概念就是Topic。一个Topic下面会有多个分区,每个分区都有备份,分布在不同的代理服务器(Broker)上。生产者会把数据记录发布到一个Topic下面的分区中,具体方式是轮询或者基于主键做分区,而消费者会处理Topic中发布出来的数据记录。
所有数据都是发布给相应分区的主代理进程,再复制到从代理进程,所有的读数据请求也都是依次由主代理处理的。从代理仅仅用于数据的冗余备份,并在主代理无法继续提供服务时顶上。图一的左边部分显示了Kafka中的数据流。
DistributedLog是什么
与Kafka不同,DistributedLog并不是一个基于分区的发布/订阅系统,它是一个复制日志流仓库。DistributedLog中的核心概念是持续的复制日志流。一个日志流会被分段成多个日志片段。每个日志片段都在Apache
BookKeeper中存储成Apache BooKeeper中的一个Ledger,其中的数据会在多个Bookie(Bookie就是Apache
BookKeeper的存储节点)之间复制和均衡分布。
一个日志流的所有数据记录都由日志流的属主排序,由许多个写入代理来管理日志流的属主关系。应用程序也可以使用核心库来直接追加日志记录。这对于复制状态机一类对于顺序和排他写有着非常高要求的场景非常有用。每个追加到日志流末尾的日志记录都会被赋予一个序列号。
读者可以从任何指定的序列号开始读日志流的数据。读请求也会在那个流的所有存储副本上做负载均衡。图1的右半部分显示了DistributedLog中的数据流。
图1 Apache Kafka与Apache DistributedLog
Kafka与DistributedLog有什么不同
因为同类事物才有可比较的基础,所以我们只在本文中把Kafka分区和DistributedLog流相对比。下表列出了两套系统之间最显著的不同点。
数据模型
Kafka分区是存储在代理服务器磁盘上的以若干个文件形式存在的日志。每条记录都是一个键-值对,但对于轮询式的数据发布可以省略数据的主键。主键用于决定该条记录会被存储到哪个分区上以及用于日志压缩功能。一个分区的所有数据只存储在若干个代理服务器上,并从主代理服务器复制到从代理服务器。
DistributedLog流是以一系列日志分片的形式存在的虚拟流。每个日志分片都以一条BookKeeper
Ledger的形式存在,并被复制到多个Bookie上。在任意时刻都只有一个活跃的日志分片接受写入请求。
在特定的时间段过后,或者旧日志分片达到配置大小(由配置的日志分片策略决定)之后,或者日志的属主出故障之后,旧的日志分片会被封存,一个新的日志分片会被开启。
Kafka分区和DistributedLog流在数据分片和分布的不同点决定了它们在数据持久化策略和集群操作(比如集群扩展)上的不同。
图2显示了DistributedLog和Kafka数据模型的不同点。
图2 Kafka分区与DistributedLog流
数据持久化
一个Kafka分区中的所有数据都保存在一个代理服务器上(并被复制到别的代理服务器上)。在配置的有效期过后数据会失效并被删除。另外,也可以配置策略让Kafka的分区保留每个主键的最新值。
与Kafka相似,DistributedLog也可以为每个流配置有效期,并在超时之后将相应的日志分片失效或删除。除此之外,DistributedLog还提供了显示的截断机制。应用程序可以显式地将一个日志流截断到流的某个指定位置。这对于构建可复制的状态机非常有用,因为可复制的状态机需要在删除日志记录之前先将状态持久化。Manhattan就是一个用到了这个功能的典型系统。
操作
数据分片和分布机制的不同也导致了维护集群操作上的不同,扩展集群操作就是一个例子。
扩展Kafka集群时,通常现有分区都要做重新分布。重新分布操作会将Kafka分区挪动到不同的副本上,以此达到均衡分布。这就要把整个流的数据从一个副本拷到另一个副本上。我们也说过很多次了,执行重新分布操作时必须非常小心,避免耗尽磁盘和网络资源。
而扩展DistributedLog集群的工作方式则截然不同。DistributedLog包含两层:存储层(Apache
BooKeeper)和服务层(写入和读出代理)。在扩展存储层时,我们只需要添加更多的Bookie就好了。新的Bookie马上会被写入代理发现,并立刻用于写入新的日志分片。
在扩展数据存储层时不会有任何的重新分布操作。只在增加服务层时会有重新分布操作,但这个重新分布也只是移动日志流的属主权,以使网络代宽可以在各个代理之间均衡分布。
这个重新分布的过程只与属主权相关,没有数据迁移操作。这种存储层和服务层的隔离不仅仅是让系统具备了自动扩展的机制,更让各种不同类型的资源可以独立扩展。
写与生产者
如图1所示,Kafka生产者把数据一批批地写到Kafka分区的主代理服务器上。而ISR(同步复制)集合中的从代理服务器会从主代理上把记录复制走。只有在主代理从所有的ISR集合中的副本上都收到了成功的响应之后,一条记录才会被认为是成功写入的。可以配置让生产者只等待主代理的响应,还是等待ISR集合中的所有代理的响应。
DistributedLog中则有两种方式把数据写入DistributedLog流,一是用一个Thrift的瘦客户端通过写代理(众所周知的多写入)写入,二是通过DistributedLog的核心库来直接与存储节点交互(众所周知的单独写入)。第一种方式很适合于构建消息系统,第二种则适用于构建复制状态机。你可以查阅DistributedLog文档的相关章节来获取更多的信息和参考,以找到你需要的方式。
日志流的属主会并发地以BookKeeper条目的形式向Bookie中写入一批记录,并等待多个Bookie的Quorum结果。Quorum的大小取决于BookKeeper账目的ack_quorum_size参数,并且可以配置到DistributedLog流的级别。它提供了和Kafka生产者相似的在持久性上的灵活性。在接下来的“复制”一节我们会对比两者在复制算法上的更多不同之处。
Kafka和DistributedLog都支持端到端的批量操作和压缩机制。但两者之间的一点微妙区别是对DistributedLog的写入操作都是在收到响应之前都先通过fsync刷到硬盘上的,而我们并没发现Kafka也提供了类似的可靠性保证。
读与消费者
Kafka消费者从主代理服务器上读出数据记录。这个设计的前提就是主代理上在大多数情况下最新的数据都还在文件系统页缓存中。从充分利用文件系统页缓存和获得高性能的角度来说这是一个好办法。
DistributedLog则采用了完全不同的方法。因为各个存储节点之间没有明确的主从关系,DistributedLog可以从任意存储着相关数据的存储节点上读出数据。为了获得可预期的低延迟,DistributedLog引入了一个推理式读机制,即在超出了配置的读操作时限之后,它会在不同的副本上再次尝试获取数据。
这就可能会对存储节点导致比Kafka更高的读压力。不过,如果将读超时时间配成可以让99%的存储节点的读操作都不会超时,那就可以极大程度地解决延迟问题,只带来1%的额外读压力。
对于读的考虑和机制上的不同主要源于复制机制和存储节点的I/O系统的不同,在下文会继续讨论。
复制
Kafka用的是ISR复制算法:将一个代理服务器选为主。所有写操作都被发送到主代理上,所有处于ISR集合中的从代理都从主代理上读取和复制数据。主代理会维护一个高水位线(HW,High
Watermark),即每个分区最新提交的数据记录的偏移量。
高水位线会不断同步到从代理上,并周期性地在所有代理上记录检查点,以备恢复之用。在所有ISR集合中的副本都把数据写入了文件系统(并不必须是磁盘)并向主代理发回了响应之后,主代理才会更新高水位线。
ISR机制让我们可以增加或减少副本的数量,在可用性和性能之间做出权衡。可是扩大或缩小副本的集合的副作用是增大了丢失数据的可能性。
DistributedLog使用的是Quorum投票复制算法,这在Zab、Raft以及Viewstamped
Replication等一致性算法中都很常见。日志流的属主会并发地把数据记录写入所有存储节点,并在得到超过配置数量的存储节点投票确认之后,才认为数据已成功提交。
存储节点也只在数据被显式地调用flush操作刷入磁盘之后才会响应写入请求。日志流的属主也会维护一个日志流的最新提交的数据记录的偏移量,就是大家知道的Apache
BookKeeper中的LAC(LastAddConfirmed)。LAC也会保存在数据记录中(来节省额外的RPC调用开销),并不断复制到别的存储节点上。
DistributedLog中复本集合的大小是在每个流的每个日志分片级别可配置的。改变复制参数只会影响新的日志分片,不会影响已有的。
存储
每个Kafka分区都以若干个文件的形式保存在代理的磁盘上。它利用文件系统的页缓存和I/O调度机制来得到高性能。Kafka也是因此利用Java的sendfile
API来高效地从代理中写入读出数据的。不过,在某些情况下(比如消费者处理不及时、随机读写等),页缓存中的数据淘汰很频繁,它的性能也有很大的不确性性。
DistributedLog用的则是不同的I/O模型。图3表示了Bookie(BookKeeper的存储节点)的I/O机制。写入(蓝线)、末尾读(红线)和中间读(紫线)这三种常见的I/O操作都被隔离到了三种物理上不同的I/O子系统中。所有写入都被顺序地追加到磁盘上的日志文件,再批量提交到硬盘上。
在写操作持久化到磁盘上之后,它们就会放到一个Memtable中,再向客户端发回响应。Memtable中的数据会被异步刷新到交叉存取的索引数据结构中:记录被追加到日志文件中,偏移量则在分类账目的索引文件中根据记录ID索引起来。
最新的数据肯定在Memtable中,供末尾读操作使用。中间读会从记录日志文件中获取数据。由于物理隔离的存在,Bookie节点可以充分利用网络流入带宽和磁盘的顺序写入特性来满足写请求,以及利用网络流出代宽和多个磁盘共同提供的IOPS处理能力来满足读请求,彼此之间不会相互干扰。
图3 BookKeeper的I/O隔离
小结
Kafka和DistributedLog都是设计来处理日志流相关问题的。它们有相似性,但在存储和复制机制上有着不同的设计理念,因此有了不同的实现方式。希望这篇文章能从技术角度解释清楚它们的区别,回答一些问题。我们接下来也会再多写一些文章来讲讲DistributedLog的性能指标。
DistributedLog和雅虎Pulsar技术对比
从产品和架构的角度来说,雅虎Pulsar是一个更类似于Apache Kafka的消息发布订阅系统。它跟Apache
Kafka最大的两个差别是在其消息消费模型和消息的存储方式。
消息的消费模型:Apache Kafka提供的是基于分区的Pub/Sub模式,一个消息分区只能严格地由一个消费者进行消费。Kafka的消费者可以通过消费者群(Consumer
Group)来共同消费多个分区。
不同于Apache Kafka,雅虎的Pulsar提供更加灵活的消费模型
- 对于同一个Topic,消费者可以指定使用独占(Exclusive)、分享(Shared/Round-Robin)和Failover等三个不同方式消费。
独占式的消费与Kafka的消费是相同的,分享式的消费更接近于传统的Produer-Consumer
Queue消费模式。虽然理论上Kafka可以通过增加分区的数量和消费者的数量,来达到类似的效果,但是无法做到消息级别的细粒度。
消息的存储模型:Apache Kafka的存储方式在与DL的比较中已经有所介绍。与DL类似的是,雅虎Pulsar也使用Apache
BookKeeper进行消息的存储(具体的存储方式有所不同,在下面会稍加讨论)。
所以Pulsar的Broker相对于Kafka的Broker是一个无状态的Broker,所以更容易进行Failover。因为Pulsar也是用Apache
BookKeeper进行存储,所以此处Pulsar和Kafka的区别可以参考上面DL和Kafka在数据存储和多副本复制上的差别。
Pulsar和DistributedLog有很多相似地地方,也有不同之处。如果要将DistributedLog跟Pulsar进行比较,应该比较的的对象是Pulsar中基于BookKeeper做的ManagedLedger。ManagedLedger跟DistributedLog
Stream类似,也是由一组BookKeeper Ledger组成。但其核心的区别在于:
ManagedLedger是一个典型的日志实现,它提供Single-Writer-Single-Reader的语义。一个ManagedLedger只能有一个Broker打开进行读写。而DistributedLog的Stream是一个面向流的实现,提供的是Single-Writer-Multiple-Readers的语义。
也是因为在这个日志实现的差异,导致了架构的一个差异:在Pulsar中,读写都有Topic的属主Broker进行服务;而在DistributedLog中,读写被分离在读代理和写代理中进行服务。此外,ManagedLedger也进行记录消费的偏移;而DistributedLog将偏移量的记录交由上层应用管理。
这样设计上的差异导致了产品理念上的不同,Yahoo Pulsar是一个消息系统,而DistributedLog则是一个基于复制日志的实时数据存储。Yahoo和Twitter作为Apache
BookKeeper社区的两个比较大的使用者,在改进Apache BookKeeper方面也做了很大的贡献。 |