编辑推荐: |
本文主要介绍了Flink的容错机制状态的一致性、Checkpoint原理、Savepoint原理、checkpoint和savepoint的区别、Kafka+Flink+Kafka
实现端到端严格一次等相关内容。
本文来自csdn,由火龙果软件Anna编辑、推荐。 |
|
一、状态的一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。
一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
一致性级别
在流处理中,一致性可以分为3个级别:
at-most-once(最多一次):
这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
at-least-once(至少一次):
这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once(严格一次):
这指的是系统保证在发生故障后得到的计数结果与正确值一致.既不多算也不少算
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:
保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性
流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。
最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
Flink的一个重大价值在于,它既保证了exactly-once,又具有低延迟和高吞吐的处理能力。
从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。
端到端的状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如
Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。
具体划分如下:
source端
需要外部源可重设数据的读取位置.目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset
flink内部
依赖checkpoint机制
sink端
需要保证从故障恢复时,数据不会重复写入外部系统. 有2种实现形式:
a)幂等(Idempotent)写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
b)事务性(Transactional)写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到
checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
二、Checkpoint原理
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢?
如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。
于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开;
当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的;
这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
Flink的检查点算法
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如
异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照的实现算法:
a)简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用
b)Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport
algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous
barrier snapshotting)
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
理解Barrier
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中,
然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
每个barrier会把数据流分成两部分: 一部分数据进入当前的快照
, 另一部分数据进入下一个快照 . 每个barrier携带着快照的id. barrier 不会暂停数据的流动,
所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.
Flink的检查点制作过程
第一步: Checkpoint Coordinator 向所有 source
节点 trigger Checkpoint. 然后Source Task会在数据流中安插CheckPoint
barrier
第二步: source 节点向下游广播 barrier,这个 barrier
就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input
的 barrier 才会执行相应的 Checkpoint
第三步: 当 task 完成 state 备份后,会将备份数据的地址(state
handle)通知给 Checkpoint coordinator。
第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier
之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint
的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
第五步: 同样的,sink 节点在完成自己的 Checkpoint
之后,会将 state handle 返回通知 Coordinator。
第六步: 最后,当 Checkpoint coordinator 收集齐所有
task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个
Checkpoint meta 文件。
严格一次语义: barrier对齐
在多并行度下, 如果要实现严格一次, 则要执行barrier对齐.
当 job graph 中的每个 operator 接收到barriers
时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行barrier
对齐(barrier alignment)以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有
events 而产生的状态。
1.当operator收到数字流的barrier n时, 它就不能处理(但是可以接收)来自该流的任何数据记录,直到它从字母流所有输入接收到
barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录。
2.接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区,
不会被处理。
3.图一中的 Checkpoint barrier n之后的数据
123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier
n到达之后才会开始处理.
4.一旦最后所有输入流都接收到 barrier n,Operator
就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n
接着往下游发送。这里还会对自身进行快照。
至少一次语义: barrier不对齐
前面介绍了barrier对齐, 如果barrier不对齐会怎么样?
会重复消费, 就是至少一次语义.
假设不对齐, 在字母流的Checkpoint barrier n到达前,
已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint
n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算.
1 2 3 会被再次被计算, 所以123出现了重复计算.
三、Savepoint原理
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
四、checkpoint和savepoint的区别
五、Kafka+Flink+Kafka 实现端到端严格一次
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性
source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。
具体的两阶段提交步骤总结如下:
第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到
barrier 的算子将状态存入状态后端,并通知 jobmanagerr
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
外部kafka关闭事务,提交的数据可以正常消费了
六、在代码中测试Checkpoint
package com.flink.charpter07.state;
import org.apache.flink.api .common.functions.FlatMapFunction;
import org.apache.flink.api .common.functions.MapFunction;
import org.apache.flink.api .common.serialization.SimpleStringSchema;
import org.apache.flink.api .java.tuple.Tuple2;
import org.apache.flink.runtime .state.filesystem.FsStateBackend;
import org.apache.flink.streaming .api.CheckpointingMode;
import org.apache.flink.streaming .api.environment.CheckpointConfig;
import org.apache.flink.streaming .api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector; import java.io.IOException;
import java.util.Properties; public class S04_CheckPoint {
public static void main(String[] args) throws
IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
env.setParallelism(2); Properties properties = new Properties();
properties.setProperty ("bootstrap.servers", "hadoop162:9092,
hadoop163:9092");
properties.setProperty ("group.id", "S04_CheckPoint");
properties.setProperty ("auto.offset.reset", "latest");
env.setStateBackend(new FsStateBackend ("hdfs://hadoop162:8020/ flink/checkpoints/fs"));
//设置checkpoint的时间间隔
env.enableCheckpointing(2000);
//设置模式为精准一次(默认值)
env.getCheckpointConfig().setCheckpointingMode
(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint之间的时间间隔
env.getCheckpointConfig( ).setMinPauseBetweenCheckpoints(1000);
//checkpoint必须在一分钟内完成,否则被抛弃
env.getCheckpointConfig( ).setCheckpointTimeout(60000);
//同一时间只允许一个checkpoint
env.getCheckpointConfig( ).setMaxConcurrentCheckpoints(1);
//开启在job中终止后任然保留的externalized checkpoints
env.getCheckpointConfig( ).enableExternalizedCheckpoints
(CheckpointConfig .ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
env.addSource(new FlinkKafkaConsumer<String>
("test10",new SimpleStringSchema(
),properties))
.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>()
{
@Override
public void flatMap(String value, Collector<Tuple2< String,Long>>
out) throws Exception {
for (String word : value.split(" "))
{
out.collect(Tuple2.of(word,1l));
}
}
})
.keyBy(t->t.f0)
.sum(1)
//.map(t->"("+t.f0+":"+t.f1+")")
.map(new MapFunction<Tuple2<String, Long>,
String>() {
@Override
public String map(Tuple2<String, Long>
value) throws Exception {
StringBuffer bs = new StringBuffer();
bs.append("(").append(value.f0).append(" :").append(value.f1).append(")");
return bs.toString();
}
})
.addSink(new FlinkKafkaProducer<String>
("hadoop162:9092","test11",new
SimpleStringSchema()));
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
|