Flink关注点

 

记录一下个人看了一些Flink文章后的理解与个人关注点,


Overview

基于Flink 1.4。 先来看看大数据计算引擎的发展路线

  1. 第一代,hadoop的MapReduce
  2. 第二代,DAG框架的Tez,Oozie
  3. 第三代,Job内部的DAG支持,以及强调实时计算,spark
  4. 第四代,迭代,流,批,SQL

基本概念

source -> transformation -> sink

  • stream是算子的中间结果数据
  • transformation是一个操作,它对一个或多个输入stream进行计算处理,输出一个或多个结果stream
  • streaming dataflow是一个执行中的flink程序,启动于一个或多个source,结束于一个或多个sink

a complete streaming dataflow (flink apploication)


并行Dataflow

一个stream可以被分成多个stream分区(stream partition)。 一个operator可以被分成多个operator subTask。

parallel


基本模块

flink类似spark,是一个基于master-slave风格的架构。 运行时runtime主要有2个进程,一个是JobManagers,另一个是TaskManagers;client不属于运行时和程序执行的一部分,而是用于准备dataflow并将其发送到JobManager。

flink生态部件

jobManager(master)是flink系统的协调者,负责接收flink job,调度组成job的多个task的执行;手机job的状态信息,管理flink集群中从节点taskManager,

  • registerTaskManager,在Flink集群启动的时候,TaskManager会向JobManager注册
  • submitJob,Flink程序内部通过Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息
  • cancelJob,请求取消一个Flink Job的执行,CancelJob消息中包含了Job的ID
  • updateTaskExecutionState,TaskManager向JobManager请求更新状态信息
  • requestNextInputSplit,运行在TaskManager上面的Task,请求获取下一个要处理的输入Split
  • jobStatusChanged,表示Flink Job的状态发生的变化

taskManager是一个actor(akka),负责执行计算的worker,在其上执行flink job的一组task。每个taskManager负责管理其所在节点上的资源信息,如mem, disk, network,在启动的时候将资源状态向jobManager汇报,

  • 注册阶段,TaskManager会向JobManager注册,发送registerTaskManager消息
  • 可操作阶段,接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask

client,当用户提交一个flink程序时,会首先创建一个client,该client首先会对用户提交的flink程序进行预处理,并提交到flink集群中,

  • client需要从用户提交的flink程序配置中获取jobManager的地址,并建立到jobManager的连接,将flink job提交给jobManager
  • client会将用户提交的flink程序组装成一个jobGraph,并且是以jobGraph的形式提交。一个jobGraph是一个flink dataflow,它是由多个jobVertex组成的DAG。JobManager会将一个JobGraph转换映射为一个ExecutionGraph

组件栈

Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件,

flink组件栈

flink on yarn

启动flink yarn session的时候,

  1. 最左边的模块Flink YARN Client check requested resources (containers and memory) are available,检查资源可得性
  2. Client uploads a jar that contains Flink and the configuration to HDFS,上传代码和配置
  3. Client request a YARN container to start the ApplicationMaster(AM,单个作业的资源管理和任务监控模块,以前是一个全局的JobTracker负责的,现在每个作业都一个),启动yarn AM
  4. AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS

客户端client负责向ResourceManager(RM)提交ApplicationMaster,并查询应用程序运行状态,ApplicationMaster(AM)负责向ResourceManager申请资源(以Container形式表示),并与NodeManager(NM)通信以启动各个Container,此外,ApplicationMaster还负责监控各个任务运行状态,并在失败是为其重新申请资源。

flink RM Dispatcher,用于统一发布Job并监控实例的运行。但是可以选择是否使用Dispatcher。

without dispatch yarn

with dispatch yarn

with dispatch mesos

  • Runtime层,提供了支持Flink计算的全部核心实现
  • API层,实现了面向无界streaming的流处理和面向有界Batch的批处理接口
  • Libraries层,Flink应用框架层,CEP复杂事件处理、Table基于SQL-like的关系操作、FlinkML机器学习、Gelly图处理

内部原理

容错机制

Flink基于Checkpoint机制实现容错,它的原理是不断地生成分布式Streaming数据流Snapshot。在流处理失败时,通过这些Snapshot可以恢复数据流处理。

Barriers

checkpoint, snapshot, stream aligning, exactly once, at least once

调度机制

在jobManager,会接收到client提交的jobGraph形式的flink job,并将其转换映射为executionGraph

JobManager transforms the JobGraph into an ExecutionGraph

  • jobGraph是一个job的用户逻辑视图表示,将一个用户要对数据流进行的处理表示为单个DAG图
  • executionGraph是jobGraph的并行表示,也就是实际jobManager调度一个job在taskManager上运行的逻辑视图,也是一个DAG

Op

上图用户提交的Flink Job对各个Operator进行的配置(从下往上),即data source的并行度设置为4(最底层1个data source,但是其parallel=4),MapFunction的并行度也为4(中间层),ReduceFunction的并行度为3(顶层)。

迭代机制

机器学习和图计算应用,都会使用到迭代计算。flink通过迭代operator中定义step函数来实现迭代算法,包括Iterate和Delta Iterate两类,

iterate operator

delta iterate operator

反压机制

flink使用了高效有界的分布式阻塞队列,就像java通用的blockingQueue。一个较慢的接收者会降低发送者的发送速率,因为一旦有界队列满了发送者会被阻塞。

flink在网络传输场景下的内存管理

  • 当netty接收端发送数据时,为了将netty中的数据拷贝到task中(往task写入数据),InputChannel会向其对应的缓冲池localBufferPool申请内存块,
    • 如果localBufferPool也没有可用内存块且申请的数量还没到池子(队列)上限,则就向networkBufferPool申请内存块
    • 如果localBufferPool已申请的数量达到上限了,或者networkBufferPool也没有可用内存块,此时task的netty channel会暂停读取,上游的发送端会立即响应停止发送,拓扑进入反压状态
  • 当task线程写数据到resultPartition时(task数据往外写),也会向池子请求内存块,如果没有可用内存块时,也阻塞在请求内存块的地方,达到暂停写入的目的
  • 在一个内存块被消费完成之后(在输出端是指内存块中的字节写入到netty channel;在输入端是指内存块中的字节被反序列化成对象),会调用buffer.recycle()方法,将内存块还给localBufferPool,如果localBufferPool中当前申请的数量超过了池子容量,则localBufferPool会将该内存块回收给networkBufferPool。如果没超池子容量,则继续留在localBufferPool中,减少反复申请的开销

backPressure在流式计算系统中用于协调上、下游operator的处理速度。因为在一个stream上进行处理的多个operator之间,它们的处理速度和方式可能非常不同,所以就存在上游operator如果处理速度过快,下游operator可能会堆积stream记录。因此,对下游operator处理速度跟不上的情况,如果下游operator能够将自己处理状态传播给上游operator,使得上游operator处理速度慢下来,从而缓解上述问题。

堆栈跟踪Sampling线程

JobManager会反复调用Task运行所在线程的Thread.getStackTrace(),默认情况下,JobManager会每隔50ms触发对每个Task依次进行100次堆栈跟踪调用,根据调用调用结果来确定Backpressure,通过计算得到一个比值radio来确定当前运行Job的Backpressure状态。在Web界面上可以看到这个Radio值,它表示在一个内部方法调用中阻塞(Stuck)的堆栈跟踪次数,例如,radio=0.01,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:

  • OK: 0 <= Ratio <= 0.10
  • LOW: 0.10 < Ratio <= 0.5
  • HIGH: 0.5 < Ratio <= 1

Reference