编辑推荐: |
来源网络,Flink
是一个针对流数据和批数据的分布式处理引擎。主要是由 Java 代码实现。其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。 |
|
关Flink出现的背景 我们知道目前流处理的主要流行的计算引擎有,Storm,SparkStreaming。但是这个两个计算引擎都有自己的局限性。Storm实现了低延迟,但是目前还没有实现高吞吐,也不能在故障发生的时候准确的处理计算状态(将数据从一个事件保存到另一个事件的,这些保留下来的是数据较计算状态),同时也不能实现exactly-once。SparkStreaming通过微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理的能力,也不能使用窗口与自然时间相匹配。Flink的出现完美的解决了以上问题,这也是flink出现的原因,flink不仅能提供同时支持高吞吐和exactly-once语义的实时计算,还能够提供批量数据的处理,并且和其他的计算引擎相比,flink能够区分出不同的类型的时间。
Flink 简介 Flink 的前身已经是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,然后迅速地成为了
ASF(Apache Software Foundation)的顶级项目之一。Flink 是一个针对流数据和批数据的分布式处理引擎。主要是由
Java 代码实现。其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。Flink
可以支持本地的快速迭代,以及一些环形的迭代任务。并且 Flink 可以定制化内存管理。在这点,如果要对比
Flink 和 Spark 的话,Flink 并没有将内存完全交给应用层。这也是为什么 Spark
相对于 Flink,更容易出现 OOM 的原因(out of memory)。就框架本身与应用场景来说,Flink
更相似与 Storm。下面让我们先来看下 Flink 的架构图。
如图 所示,我们可以了解到 Flink 几个最基础的概
Client、JobManager 和 TaskManager:
Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager
去执行,然后 TaskManager 会心跳的汇报任务状态。从架构图去看,JobManager 很像当年的
JobTracker,TaskManager 也很像当年的 TaskTracker。然而有一个最重要的区别就是
TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce
之间的 Shuffle,而对 Flink 而言,可能是很多级而不像 Hadoop,是固定的 Map
到 Reduce。
Flink 的生态圈(技术栈)
Flink 首先支持了 Scala 和 Java 的 API,Python
也正在测试中。Flink 通过 Gelly 支持了图操作,还有机器学习的 FlinkML。Table
是一种接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和执行。值的一提的是flink分别提供了面向流处理接口(DataStream
API)和面向批处理的接口(DataSet API),同时flink支持拓展库设计机器学习,FlinkML,复杂时间处理(CEP)以及图计算,还有分别针对流处理和批处理的Table
API
执行配置
flink执行环境包括批处理和流出,所以要分两种情况进行执行配置
Flink 批处理环境
val env = ExecutionEnvironment
.getExecutionEnvironment |
Flink 流处理环境
val env = StreamExecutionEnvironment
.getExecutionEnvironment |
接下来我可以在env进行相关的设置
StreamExecutionEnvironment包含ExecutionConfig允许为运行时设置工作的具体配置值。要更改影响所有作业的默认值。
val env = StreamExecutionEnvironment .getExecutionEnvironment
var executionConfig = env.getConfig |
可以使用以下配置选项:
enableClosureCleaner() /disableClosureCleaner()。
默认情况下启用闭包清理器。闭包清理器删除Flink程序中对周围类匿名函数的不需要的引用。禁用闭包清除程序后,可能会发生匿名用户函数引用周围的类(通常不是Serializable)。这将导致序列化程序出现异常。
getParallelism() /setParallelism(int
parallelism)
设置作业的默认并行度。
getMaxParallelism() /setMaxParallelism(int
parallelism)
设置作业的默认最大并行度。此设置确定最大并行度并指定动态缩放的上限
还有其他的配置项可以配置,就不一一列举,可以参考flink官方网站
设置并行性
Flink程序由多个任务(转换/运算符,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为并行性。如果要使用保存点,还应考虑设置最大并行度(或max
parallelism)。从保存点恢复时,您可以更改特定运算符或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为密钥组,并且我们不能拥有+Inf多个密钥组,因为这会对性能产生不利影响。
操作级别
可以通过调用其setParallelism()方法来定义单个运算符,数据源或数据接收器的并行性。例如
final StreamExecutionEnvironment
env = StreamExecutionEnvironment .getExecutionEnvironment();
DataStream<String> text = [...]DataStream<Tuple2<String,
Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow (Time.seconds(5))
.sum(1).setParallelism(5) ; wordCounts.print()
;env.execute("Word Count Example");
|
执行环境级别
Flink程序在执行环境的上下文中执行。执行环境为其执行的所有操作符,数据源和数据接收器定义默认并行性。可以通过显式配置运算符的并行性来覆盖执行环境并行性。可以通过调用setParallelism()方法来指定执行环境的默认并行性。要以并行方式执行所有运算符,数据源和数据接收器,请3按如下方式设置执行环境的默认并行度:
final StreamExecutionEnvironment
env = StreamExecutionEnvironment .getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]DataStream<Tuple2<String,
Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
|
客户级别
在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。
对于CLI客户端,可以使用指定parallelism参数-p。例如:
./bin/flink run -p 10 ../examples/ *WordCount-java*.jar
基本API(流处理和批处理)
批处理是流处理的一种非常特殊的情况。Flink的特殊之处就在于既可以把数据当做流进行处理也可以把数据当作有限流进行批处理。可以理解为:
DataSet PI用于批处理:相当于spark core
DataStream API用于流式处理:相当于spark streaming
DataSet和DataStream
Flink具有特殊类DataSet和DataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet数据有限,对于一个DataStream元素的数量可以是无界的。这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用API方法如衍生map,filter等等。
Flink计划的剖析
Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:
获得一个execution environment,
加载/创建初始数据,
指定此数据的转换,
指定放置计算结果的位置,
触发程序执行
我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala
DataSet API的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala
DataStream API的类可以在org.apache.flink.streaming.api.scala中找到。
这StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态方法获取一个StreamExecutionEnvironment:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int,
jarFiles: String*) |
通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法并getExecutionEnvironment()返回一个执行环境,以便在集群上执行您的程序。
读取数据
对于指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:
val env = StreamExecutionEnvironment .getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file") |
这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。
您可以通过使用转换函数调用DataSet上的方法来应用转换。
例如,map转换如下所示:
val input:DataSet[String]= ...val
mapped=input.map {x=>x.toInt}
这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
数据输出
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:
writeAsText(path:String)print()
一旦您指定的完整程序,你需要触发执行程序调用execute()上StreamExecutionEnvironment。根据执行的类型,ExecutionEnvironment将在本地计算机上触发执行或提交程序以在群集上执行。
该execute()方法返回一个JobExecutionResult,包含执行时间和累加器结果。
flink编程模型
DataSet和DataStream相关算子太多就不一一列举了,使用时可以参考官方文档。在这举两个例子进行展示flink的编程模型
案例一:基于文件(本地,hdfs)的wordcount
public class
FunctionTest {
public static void main(String[] args) throws
Exception {
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文本文件中的数据
DataStreamSource<String> streamSource =
env.readTextFile ("C:/flink_data/1.txt");
//进行逻辑计算
SingleOutputStreamOperator< Tuple2<String,
Integer>> dataStream = streamSource
.flatMap(new Splitter())
.keyBy(0)
.sum(1);
dataStream.print();
//设置程序名称
env.execute("Window WordCount");
}
}
|
实现 FlatMapFunction
public class
Splitter implements FlatMapFunction< String,
Tuple2<String, Integer>> {
@Override
public void flatMap (String sentence, Collector< Tuple2<String,
Integer>> out) throws Exception {
for (String word: sentence.split (" "))
{
out.collect(new Tuple2<String, Integer>(word,
1));
}
}
}
|
案例二:读取kafak中的数据保存到hdfs中
添加maven依赖
<dependency>
<groupId> org.apache.flink< /groupId>
<artifactId> flink-connector-kafka-0.9_2.10 </artifactId>
<version>1.1.3</version>
</dependency>
|
程序代码
object DataFkafka
{
def main(args: Array[String]): Unit = {
//设置kafka连接参数
val properties = new Properties()
properties.setProperty ("bootstrap.servers",
"10.10.4.11:9092, 10.10.49.183:9092,10.10.49.207:9092");
properties.setProperty ("zookeeper.connect",
"10.10.4.11:2181, 10.10.49.183:2181");
properties.setProperty ("group.id", "res");
//获取流执行环境
val env = StreamExecutionEnvironment .getExecutionEnvironment
//设置时间类型
env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)
//设置检查点时间间隔
env.enableCheckpointing(1000)
//设置检查点模式
env.getCheckpointConfig.setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE)
//创建kafak消费者,获取kafak中的数据
val myConsumer: FlinkKafkaConsumer010[String]
= new FlinkKafkaConsumer010 [String]("flink",
new SimpleStringSchema(), properties)
val kafkaData: DataStream[String] = env.addSource(myConsumer)
kafkaData.print()
//数据保存到hdfs
kafkaData.writeAsText ("hdfs://10.10.4.11:9000/output/flink.txt")
print("kafka")
//设置程序名称
env.execute("data_from_kafak_wangzh")
}
}
|
java和scala对比可以看出 还是scala比较简洁。
检查点 checkpoint
Flink的检查点特性在流处理器中是独一无二的,程序运行时有flink自动生成,
它使得flink可以准确的维持状态,实现数据的一致性(exactly-once),并且高效的重新处理数据。
检查点介绍
Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。屏障不会中断流处理,因此非常轻量级。来自不同快照的多个屏障可能同时出现在流中,这意味着多个快照可能并发地发生。
举例说明:就像多个人一起数一串项链的珠子数量,几个人在说话,可能某一时刻,忘记数量是多少了,此时如果我们每五个珠子就栓一条不同的颜色,并且提前设置好规则。比如红的代表数五个,黄色的代表数了10珠子,以次类推,那么当我们忘记数了个珠子的时候多少时,就可以看一下绳子的颜色,就知道最新的绳子代表的珠子说,重新从绳子哪里继续数珠子的个数。
下图是checkpoint的整体逻辑图,其中ckpt是检查点屏障。在数据流中,每一天数据都会严格按照检查点前和检查点后的规定,被处理。检查点屏障也会像数据一样在算子之前流动。当flink算子遇到检查点屏障时,它会将检查点在数据流的位置记录下来,如果数据来自kafak那么位置就是偏移量。
当检查点操作完成,结果状态和位置会备份到稳定的存储介质中如下图。需要注意的是:如果检查点操作失败了,flink会丢弃该检查点继续正常执行,因为之后的某一个检查点很大程度会成功,虽然这样恢复时间有点长,但是对状态的保障依旧很有力,只有在一系列连的检查点操作失败flink才会报错。
故障紧跟检查点的情况
当检查点操作已经完成,但是故障紧随其后。这种情况下,flink会重新拓扑,将输入流倒回到上一个检查点,然后恢复状态值并从该出重新继续计算,可以保证在剩下的记录被处理后,得到的map算子的状态与没有发生故障的状态一致,值得注意的是有些数据会重复计算,也就是数据可能会出现局部的重复。但是我们可以将数据流写入到特殊的系统中(比如文件系统,数据库)来解决这个问题。
启用和配置检查点
默认情况下,禁用检查点。为了使检查点在StreamExecutionEnvironment上,调用
enableCheckpointing(n),其中是以毫秒为单位的检查点间隔。
检查点的其他参数包括:
完全一次与至少一次:您可以选择将模式传递给enableCheckpointing(n)方法,以在两个保证级别之间进行选择。对于大多数应用来说,恰好一次是优选的。至少一次可能与某些超低延迟(始终为几毫秒)的应用程序相关。
checkpoint timeout(检查点超时):如果当前检查点未完成,则中止检查点的时间。
minimum time between checkpoints检查点之间的最短时间:为确保流应用程序在检查点之间取得一定进展,可以定义检查点之间需要经过多长时间。如果将此值设置为例如5000,则无论检查点持续时间和检查点间隔如何,下一个检查点将在上一个检查点完成后不迟于5秒启动。请注意,这意味着检查点间隔永远不会小于此参数。
通过定义“检查点之间的时间”而不是检查点间隔来配置应用程序通常更容易,因为“检查点之间的时间”不易受检查点有时需要比平均时间更长的事实的影响(例如,如果目标存储系统暂时很慢)。
请注意,此值还表示并发检查点的数量为一。
number of concurrent checkpoints并发检查点数:默认情况下,当一个检查点仍处于运行状态时,系统不会触发另一个检查点。这可确保拓扑不会在检查点上花费太多时间,也不会在处理流方面取得进展。可以允许多个重叠检查点,这对于具有特定处理延迟的管道(例如,因为函数调用需要一些时间来响应的外部服务)而感兴趣,但是仍然希望执行非常频繁的检查点(100毫秒)
)在失败时重新处理很少。
当定义检查点之间的最短时间时,不能使用此选项。
externalized checkpoints外部化检查点:您可以将外围检查点配置为外部持久化。外部化检查点将其元数据写入持久存储,并且在作业失败时不会自动清除。这样,如果您的工作失败,您将有一个检查点可以从中恢复。有关外部化检查点的部署说明中有更多详细信息。
fail/continue task on checkpoint errors关于检查点错误的失败/继续任务:这确定如果在执行任务的检查点过程中发生错误,任务是否将失败。这是默认行为。或者,当禁用此选项时,任务将简单地拒绝检查点协调器的检查点并继续运行
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing (1000);
// advanced options: // set mode to exactly-once
(this is the default)
env.getCheckpointConfig() .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between
checkpoints
env.getCheckpointConfig() .setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute,
or are discarded
env.getCheckpointConfig() .setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress
at the same time
env.getCheckpointConfig () .setMaxConcurrentCheckpoints (1); //
enable externalized checkpoints which are retained
after job cancellation
env.getCheckpointConfig() .enableExternalizedCheckpoints (ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION);
|
窗口
窗口是一种机制。允许许多事件按照时间或者其他特征进行分组,将每一组作为整体去分析计算。Flink中的窗口主要有时间窗口,计数窗口,回话窗口。并且我们要知道flink是唯一一个支持回话窗口的开源流处理器,这里主要介绍用处组多的时间窗口。
时间窗口
时间窗口是最简单,最有用的一种窗口,它支持滚动和滑动,几个简单的例子,对传感器的发出的数据进行求和
一分钟滚动窗口收集最近一分钟的数值,并在一分钟结束时输出总和,如下图
一分钟滑动窗口计算最近一分钟的数值总和,但是每半分钟滑动一次并输出结果,如下图
第一个滑动窗口对?3,2,5,7求和得到17,半分钟后窗口滑动,然后对2,5,7,1求和得到结果15以此类推。
时间窗口代码
一分钟的滑动窗口:
Stream.timeWindows(Time.minute(1))
每半分钟(30秒)滑动一次的一分钟滑动窗口
Stream.timeWindows (Time.minute(1), Time.second(30))
计数窗口
计数窗口的分组依据不再是时间,而是元素的数量。例如在上面的图-2也可以解释为由4个元素组成的计数窗口,并且每两个元素滑动一次,滚动和滑动计数窗口定义如下
Stream.countWindow(4)
Stream.countWindow(4,2)
注意;
计数窗口不如时间窗口那么严谨,要谨慎使用,比如其定义的元素数量为100,然而某一个key对应的元素永远达不到100个,那么计数窗口就会永远不关闭,则被该窗口占用的内存就浪费了,一种解决办法就是用时间窗口触发超时。
会话窗口
会话指的是活动阶段,其前后都是非活动阶段,例如某用户在与网站进行一系列的交互之后,关闭浏览器或者不在交互(非活动阶段)。会话需要有自己的处理机制,因为他们通常没有固定的持续时间,或者说固定的交互次数(有的可能点击3次就购买了物品,有的可能点击40次才购买物品)。
在flink中。会话窗口由时间设定。既希望等待多久认为会话已经结束。举例来说,以下代码表示,用户处于非活动时间超过五分钟既认为会话结束
Stream.window (sessionWindow.withGap (Time.minutes(5)))
水印
现在有一个问题就是:如何判断所有的事件是否都已经到达,以及何时计算和输出窗口的结果?换言之就是:如何追踪事件时间,并知晓输入数据已经流入到某个事件时间呢?为了追踪事件时间,需要依靠由数据驱动的时钟,而不是系统时间。
Flink通过水印来推进事件时间。水印是嵌入在流中的常规记录。计算程序通常通过水获知某个时间点已到。比如对于一分钟的滚动窗口,假设水印标记时时间为:10:01:00,那么收到水印的窗口就知道不会再有早于该时间的记录出现,因为所有时间戳小于或等于该时间的事件都已经到达。这时,窗口就可以安全的计算并给出结果。水印使得事件时间和处理时间完全无关。迟到的水印并不会影响到结果的正确性,而会影响到结果的速度。
水印如何生成
在flink中,水印的生成由开发人员生成,这通常需要对相应的领域有一定的了解。完美的水印:时间戳小于水印标记时间的事件不会再出现。在特殊情况下(如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。启发式水印则相反,它只估计时间,因此有可能出错,既迟到的时间:晚于水印出现。如果知道时间的迟到时间不会超过5秒,就可以将水印时间设为收到最大时间戳减去5秒。另一种做法是,采用一个flink作业的监控事件流,学习事件的迟到规律,并以此构成水印的生成模型。
有状态的计算
流失计算分为有状态计算和无状态计算。无状态计算是观察每一个独立时间,并根据最后一个时间输出时间结果,有状态计算则是根据多个事件输出结果。
例如:
计算过去一个小时的平均温度就是有状态的计算,需要涉及多个事件共同计算出的结果。
广播变量
广播变量允许您为操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,操作员可以将数据集作为集合访问。
广播:广播集通过名称注册withBroadcastSet(DataSet, String)
访问:可通过getRuntimeContext() .getBroadcastVariable(String)目标运营商访问。
val data = env.fromElements("a",
"b")
data.map (new RichMapFunction[String, String]()
{
var broadcastSet: Traversable [String] = null
override def open (config: Configuration): Unit
= {
// 3. Access the broadcast DataSet as a Collection
broadcastSet =getRuntimeContext().
getBroadcastVariable[String("broadcastSetName").asScala
}
def map(in: String): String = {
}}).withBroadcastSet (toBroadcast, "broadcastSetName")
|
注意:由于广播变量的内容保存在每个节点的内存中,因此不应该变得太大。对于标量值之类的简单事物,您可以简单地将参数作为函数闭包的一部分,或者使用该withParameters(...)方法传递配置。
|