这篇文章是系列文章的第一篇,数据工匠团队会在这里为大家展示一些Apache
Flink的核心功能。
流处理通常被大家与“动态数据”关联起来,相应的系统差不多会在数据被创造出来的那一刻就立刻对其进行处理或响应。像延迟、吞吐量、水印和处理迟到的数据等等都是大家讨论得最多的流处理话题,通常是关注现在,而不是过去。
可在实际项目中,却有许多种场景需要你的流处理程序把以前处理过的数据再重新处理一遍。这里有些例子:
为你的程序部署一个新版本,可能是有新功能、修复了问题、或者采用了更好的机器学习模型;
使用相同的源数据流对应用程序的不同版本进行A/B测试,两边都从同一个点开始测试,这样就不会牺牲之前的状态;
评估或开展将应用程序迁移到更新版本的处理框架上,或是一个不同的集群上;
Apache Flink的保存点(Savepoint)功能可以支持上面的所有场景,并且也是让Flink与其它分布式开源流处理器不同的一个显著区别点。
在本文中,我们会讲述如何使用保存点功能来重新处理数据,并一定程度地深入底层,讲述这个功能在Flink中是怎么实现的。
“重新处理”到底是什么意思?
为了保证大家对重新处理数据的理解是一致的,我们先讨论一个你可能需要重新处理数据的业务例子。想像一个社交媒体公司,她除了基本的发贴功能之外,还发布了一种付费的、或者说是推广发贴的功能。
公司的用户可以访问一个简单的、基于Flink实现的仪表板,显示他们的所有文章(不管是普通的还是付费的)被大家查看、点击等等的次数。几个星期之后,从用户的反馈中就可以清晰地看到,这个仪表板如果能把普通的发贴数据和付费的发贴数据区别开来,那就会更好用。
要实现这个功能,就有必要返回到付费发贴功能最初发布的那个时刻,然后从那个时刻开始,把所有数据全都重新处理一遍。这一次要把付费贴和普通贴的展示和交互全都拆开来。如果要把从公司创立伊始产生的数据全都重新处理一遍,这就实在有点强人所难,所以能够从付费发贴的功能发布的时候开始重新处理,同时还保留之前的计算结果,这个功能就很有必要了。
所以当我们用到“重新处理”这个词时,我们的意思就是回到一个系统以前的、一致的状态(按开发者的定义,不一定非要是流的最早状态),然后从那个状态开始再处理一遍,可能也要在更改了你的Flink程序之后。
读者们可以看到的好消息就是:Flink为大家免费提供了上述重新处理功能,相应的功能就叫保存点。我们说"免费",意思是只要你的程序是容错的,并且可以从错误中恢复,那你就可以在Flink中创建一个保存点并重新处理数据,花费的额外准备工作量几乎为零。
简单说说保存点到底是什么
简而言之,一个Flink程序的保存点就是关于以下两点的全局一致的镜像:
所有数据源的位置;
所有并行操作者的状态;
“全局一致”意味着所有并行的操作者的状态都在所有输入的相同的明确定义的位置处被记录下来了。
如果在过去的某个时刻,你为某个应用程序记下了保存点,那你就可以从那个保存点的位置开始启动一个新程序。新的程序将使用那个保存点位置保存下来的操作者的状态进行初始化,并且会从记录的保存点里各个数据源的相应位置开始,重新处理全部数据。
因为Flink的保存点之间是相互完全独立的,所以对每个程序你都可以有多个保存点,这样你就可以根据这些不同的保存点的信息,回到不同的位置,启动多次、甚至不同的程序(如下图所示)。这个功能对于派生你的流处理程序,或者为它们打不同的版本,是非常有用的。
我们应该注意,在从某个保存点开始重新处理数据时,对事件的时间处理是非常重要的。重新处理基本上就意味着从过去到现在进行快速回放,也就是说,是全速地从某些存储系统中读出数据,直到赶上了当前的状态,然后再继续实时地处理新到达的数据。
因为程序对于时间的处理或者插入时间都是要依赖当前的本地时间的,那么如果在根据保存点启动程序时不使用事件的时间,而使用别的时间,对程序的逻辑而言就很可能导致错误的结果。
听起来不错,那我该做什么?
不用做很多!事实上,所有支持故障恢复的程序都是自动支持保存点的。因此,大多数进行有状态计算的程序已经满足了需要的条件。如果没有,可以对它们进行快速更新,让它们具备:
启用检查点功能:在每种情况下,我们都推荐在构建Flink程序的同时,把检查点功能打开,事实上在你的Flink程序中加上检查点只是需要增加几行代码而已。
可以重置的数据源(即Apache Kafka、Amazon Kinesis,或者文件系统等):数据源必须能按照你想要重新处理的点开始,重放数据。
所有的状态都通过Flink的管理状态接口保存:所有具体的操作者的状态都必须保存在Flink的容错状态数据结构中,这让它可以按照某个之前的保存点位置被重置。
配置一个合适的状态后台:Flink提供了不同的状态后台来将检查点和保存点持久化。默认地,保存点都保存在JobManager中,但你要为你的程序配置一个适当的后台状态程序,比如RocksDB等。
如果你已经在运行一个容错的程序了,那就创建一个保存点,然后从保存点的位置开始重新启动程序,这只需要在Flink命令行里敲几个命令就可以了。咱们接下来挨个看看。
第一步:创建一个保存点
首先,获得所有运行中的Flink任务的列表:
user$ flink list ------------Running/Restarting Jobs------------ 10.10.2016 16:20:33 : job_id : Sample Job (RUNNING) |
(运行上面的命令时,你的真实任务ID会是一个包括字母和数字的字符串。)
然后,用相应的任务ID创建一个保存点:
user$ flink savepoint job_id
现在你的保存点就已经可用了。
如果你准备马上根据你的保存点来重新启动任务,你通常会想要把现在正在运行的任务先停掉。你已经有了相应任务的ID,那把它停掉只要几秒钟就够了:
user$ flink cancel job_id |
第二步:从一个保存点开始启动任务
当你更新完程序之后,就可以从你的保存点开始启动任务了。
user$ flink run -d -s hdfs://savepoints/1 directory/your-updated-application.jar |
如果你想在一个示例程序中自己重做这些步骤,我们推荐你看看一篇之前的博客文章,我们在那里讲了怎么做这件事。
如果我想升级我的程序,该怎样做?
如果你想从一个保存点开始启动一个修改过的程序,有几件事是要考虑的。我们可以区别下面这两种情况:
改变一个用户定义的函数的逻辑,比如MapFunction;
改变一个程序的架构,也就是增加或减少操作者等;
第一种情况很简单,不需要什么特别的准备。你可以按你的需要去修改函数代码。不过,如果你用一个修改了的架构从保存点开始启动程序,那么为了能够恢复操作者的状态,Flink必须能够将保存点程序的操作者与使用了新架构的新程序的操作者对应起来。
在这种情况下,你就要手动地将操作者ID分配给最初的和更新了的程序。因为如果没有操作者ID的话,是没办法修改程序的架构的。所以最佳实践经验就要求一定要分配操作者ID。
下面的代码段显示了如何为操作者们分配ID。
DataStream stream = env. // Stateful source (e.g. Kafka) with ID .addSource(new StatefulSource()) .uid(“source-id”) .shuffle() // The stateful mapper with ID .map(new StatefulMapper()) .uid(“mapper-id”)
// Stateless sink (no specific ID required)
stream.print() |
请查阅文档,了解更多关于升级程序和保存点的细节。
关于保存点的最佳实践
要更好的利用上文中描述的Flink的重新处理功能,你应该经常触发,生成新的保存点。我们建议要根据某些时刻表(比如每天一次,每周一次,等等)自动地生成保存点,而且每当你关闭某个任务或发布程序的新版本时,也最好先生成保存点。
依据你想用Flink做的事件不同,生成保存点的最佳方法也会不同,但总的来说,在构建你的程序时你应该花些时间考虑如何使用这些保存点。
这些东西是怎么工作的呢?
保存点事实上只是检查点的一个延伸,这就是Flink的容错机制。如果开启了检查点功能,Flink就会周期性地为所有的操作者状态生成一个一致的检查点。在文档中详细的描述了检查点的细节,如果你是个Flink新手,花些时间去读读是非常值得的。
你可能会以为要生成一个一致的检查点,就得暂停数据处理,因为Flink必须要等着,直到所有没处理完的记录全被处理掉了,然后做个镜像,镜像生成之后再回去继续处理数据。事实并非如此!Flink是持续处理数据的,即使在生成检查点的时候也是这样。文档中的“Barriers”一节讲了实现这个功能的原理。
两者之间的关键区别:检查点是基于某些规定的时间间隔自动生成的,而保存点是由用户显式地触发生成的,而且不会象检查点那样过了一定的时间之后就会被删掉。
总结
我们讨论了Apache Flink的保存点和数据重处理功能,因为我们相信这就是Flink与开源世界中其它流处理器之间的重要区别之一。而且最重要的,在容错的Flink程序中获得重处理功能几乎是不需要任何代价的,只需要很少的改动。
Flink社区现在还在积极地工作着,要把保存点功能做得更好,包括在改变并发度的情况下保存状态的解决方案等。有些相应的功能(比如Flink-3755)已经发布到主分支上了,而且会被包含到下一个小版本Flink
1.2.0中。
所以,当你需要把程序多部署一份,或者上个新版本,或者要做A/B测试,或者要让多个程序从同一个点开始处理数据时,你可以这么做了,而且不会丢失那些宝贵的状态数据。
当有真实的需求时,流处理基于实时的特性不应该阻挡你把时间调回过去的动作。 |