Flink QuickStart

简介

  • Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。
  • Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

有界与无界数据流

  • 无界流: 有定义流开始, 未定义流的结束; 持续产生数据, 需要以特定顺序摄取事件 (如事件的发生顺序), 以便能够推断结果的完整性.
  • 有界流: 既定义流的开始, 又定义流的结束; 可以在摄取所有的数据后进行排序计算, 并需要有序摄取, 有界流处理通常被称为批处理.

特点

  1. 事件驱动型
  2. 流批一体
  3. 分层api
    • SQL/Table API (dynamic tables)
    • DataStream API (streams, windows)
    • ProcessFunction (events, state, time)

应用场景

  • 电商和市场营销
    • 数据报表, 广告投放, 业务流程需要
  • 物联网 (IOT)
    • 传感器实时数据采集和显示, 实时报警, 交通运输业
  • 电信业
    • 基站流量调配
  • 银行和金融业
    • 实时结算和通知推送, 实时监测异常行为
  • Spark Streaming 是微批 (micro-batching) 处理, 运行时需要指定批处理时间, 每次job处理一个批次的DStream (一组一组的小批数据RDD的集合)
  • Flink 是标准的流执行模式, 基本数据模型是DataStream 以及事件 (Event) 序列, 一个事件处理完成后可以直接发往下一个节点进行处理

架构概览

  • runtime架构: img

Job

  • 通过DataStream API, DataSet API, SQL和Table API 编写Flink任务, 会生成一个JobGraph (由 source, map, keyBy, window, apply和sink等算子组成)
  • 当JobGraph提交给Flink集群后, 能够以Local, Standalone, Yarn和K8s四种模式运行

JobManager

  • 概述:
    • 控制一个应用程序执行的主进程, 每个应用程序都会被一个不同的JobManager所控制执行
    • JobManger会先接收到要执行的应用程序, 这个应用包括: 作业图 (JobGraph), 逻辑数据流图 (logical dataflow graph) 和打包了所有的类库及其他资源jar包.
    • JobManager会把JobGraph转换成一个物理层面的数据流图 (ExecutionGraph), 该执行图包含了所有可以并发执行的任务.
    • JobManager会向资源管理器 (ResourceManager) 请求执行任务必要的资源, 也就是任务管理器 (TaskManager) 上的插槽 (slot). 一旦它获取到了足够的资源, 就会将执行图分发到真正运行它们的 TaskManager 上. 而在运行过程中, JonManager会负责所有需要中央协调的操作, 如检查点 (checkpoints) 的协调.
  • 功能:
    • 将JobGraph 转换成Execution Graph, 最终将Execution Graph拿来运行
    • Schedule组件负责Task的调度
    • Checkpoint Coordinator组件负责协调整个任务的Checkpoint (包括开始和完成)
    • 通过Actor System 与Task Manager进行通信
    • 其它一些功能, 如Recovery Metadata, 用于进行故障恢复, 可以从Metadaa里面读取数据

TaskManager

  • 概述:
    • Flink中的工作进程. 通常在Flink中会有多个TaskManager运行, 每一个TaskManager都包含一定数量的插槽 (slots), 插槽的数量限制了TaskManager能够执行的任务数量.
    • 启动之后, TaskManager会向资源管理器注册它的插槽, 收到资源管理器的指令后, TaskManager将会将一个或者多个插槽提供给JobManager调用, JobManager就可以向插槽分配任务 (Tasks) 来执行了.
    • 在执行过程中, 一个TaskManager可以跟其他运行同一应用程序的TaskManager交换数据.
  • 组件:
    • Memory & I/O Manager, 即内存I/O的管理
    • Network Manager, 用来对网络方面进行管理
    • Actor system, 用来负责网络的通信

ResourceManager

  • 主要负责管理任务管理器 (TaskManager) 的插槽 (slot), TaskManager插槽是Flink中定义的处理资源单元.
  • 提供YARN, Mesos, K8s及standlone部署.
  • 当JobManager申请插槽资源时, ResourceManager会将有空闲插槽的TaskManager分配给JobManager. 如果ResourceManager没有足够的插槽来满足JobManager的请求, 它还可以向资源提供平台发起会话, 以提供启动TaskManager进程的容器.

子组件

  • SlotManager

Dispatcher (分发器)

  • 可跨作业运行, 它为应用提交提供了REST接口.
  • 当一个应用被提交执行时, 分发器就会启动并将应用移交给一个JobManager.
  • Dispatcher也会启动一个Web UI, 用来方便地展示和监控作业执行的信息.
  • Dispatcher在架构中可能并不是必须的, 取决于应用程序提交运行的方式.

安装配置

版本选择

  • 1.9: 开始支持Hive集成, 并未完全兼容
  • 1.10:
    • 完成Blink向Flink的合并
    • 内存管理优化
    • Batch兼容Hive且生产可用
    • 对SQL的DDL进行增强
    • 支持Python UDF
    • 原生k8s初步集成
    • SQL Client CLI Beta版本, 仅支持embedded模式
  • 1.11:
    • 新的JobManager内存模型
    • Blink作为默认的planner

standlone

  • 下载
wget https://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.11.tgz
tar -zxf flink-1.10.1-bin-scala_2.11.tgz

单机standalone

  • 启动/停止
# 启动
./bin/start-cluster.sh

# 停止
./bin/stop-cluster.sh
  • web页面查看: http://localhost:8081

多机 Standalone 集群

  • 配置
    • conf/masters: host配置
    • conf/slaves: host配置
    • conf/flink-conf.yaml
    jobmanager.rpc.address: cdh00
    
  • 启动集群
./bin/start-cluster.sh

HighAvailability(HA)部署和配置

  • JobManager 是整个系统中最可能导致系统不可用的角色, 在生产业务使用 Standalone 模式,则需要部署配置 HA (flink 1.6+)
  • 配置
# conf/masters 增加 JobManager的host

# conf/flink-conf.yaml 
# 配置high-availability mode
high-availability: zookeeper

# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: cdh01:2181,cdh02:2181,cdh03:2181

# (可选)设置zookeeper的root目录
high-availability.zookeeper.path.root: /test_dir/test_standalone_root

# (可选)相当于是这个standalone集群中创建的zk node的namespace
high-availability.cluster-id: /test_dir/test_standalone

# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针
high-availability.storageDir: hdfs:///test_dir/recovery/
  • 分两种模式:
    • Job Mode: 每次提交Flink任务都会创建一个专用的Flink集群, 任务完成后资源释放
    • Session Mode: 在YARN中提前初始化一个Flink集群, 以后所有Flink任务都提交到这个集群
  • 优点:
    • 资源按需使用,提高集群的资源利用率
    • 任务有优先级,根据优先级运行作业
    • 基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover
      • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
      • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
      • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

在Yarn上启动Long Running的Flink集群 (Session CLuster模式)

  • 启动
# 查看命令参数
./bin/yarn-session.sh -h

# 创建一个 Yarn 模式的 Flink 集群
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
  • 提交任务
# 默认使用 /tmp/.yarn-properties-${user} 文件中保存的上一次创建 Yarn session 的集群信息
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
# 指定Yarn上的Application ID
./bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
  • 参数:
    • -n (--container): TaskManager 的数量
    • -jm (–jobManagerMemory): JobManager 的内存(单位 MB).
    • -tm (–taskManagerMemory): 每个 taskmanager 的内存 (单位 MB).
    • -qu: queue Specify YARN queue
    • -s (--slots): 每个 TaskManager 的 slot 数量, 默认一个 slot 一个 core, 默认每个 taskmanager 的 slot 的个数为 1, 有时候可以多一些 taskmanager, 做冗余.
    • -t (--ship): Ship files in the specified directory (t for transfer)
    • -nm: yarn 的appName yarn ui 上名字).
    • -d: 后台执行.
  • 运行单个 Flink Job 后就退出
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

Yarn 模式下的 HA 配置

  • yarn-site.xml
<!-- Yarn 集群级别 AM 重启的上限 -->
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>100</value>
</property>
  • conf/flink-conf.yaml
yarn.application-attempts: 10     # 1+ 9 retries
  • conf/flink-conf.yaml
# 配置high-availability mode
high-availability: zookeeper

# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: cdh01:2181,cdh02:2181,cdh03:2181

# (可选)设置zookeeper的root目录
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root

# 删除这个配置
# high-availability.cluster-id: /test_dir/test_standalone2

# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针
high-availability.storageDir: hdfs:///test_dir/recovery2/

编译安装CDH版本

下载制作包

# 下载制作包
git clone https://github.com/pkeropen/flink-parcel.git

修改配置文件

  • flink-parcel.properties
#FLINK 下载地址
FLINK_URL=https://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.11.tgz

#flink版本号
FLINK_VERSION=1.10.1

#扩展版本号
EXTENS_VERSION=BIN-SCALA_2.11

#操作系统版本,以centos为例
OS_VERSION=7

#CDH 小版本
CDH_MIN_FULL=5.2
CDH_MAX_FULL=5.15

#CDH大版本
CDH_MIN=5
CDH_MAX=5

生成parcel文件

./build.sh  parcel

生成csd文件

./build.sh  csd_on_yarn

CDH 中安装flink服务

  • 配置flink parcel和csd文件
  • 激活flink
  • 添加flink服务

state

keyed state

  • 只能用于 KeyedStream的函数与操作中

operator state (non-keyed state)

  • 每一个operator state都仅与一个operator的实例绑定

checkpoint

  • state就是checkpoint所做的主要持久化备份的主要数据
  • 常见的source state, 例如记录当前source的offset

执行机制

  1. Checkpoint Coordinator向所有source节点trigger Check-point
  2. source 节点向下游广播barrier (实现Chandy-Lamp-ort分布式快照算法核心), 下游task只有接收到所有input的barrier才会执行相应的Checkpoint
  3. 当task完成state备份后, 会将备份数据的地址 (state handle) 通知给 Checkpoint coordinator
  4. 下游sink节点收集齐上游input barrier之后, 会执行本地快照, 同样sink节点完成自身的checkpoint之后, 会将 state haddle返回通知Coordinator
  5. 最后当Checkpoint Coordinator收集齐所有task的state handle, 就认为这一次的checkpoint全局完成了, 向持久化存储中再备份一个Checkpoint meta文件

程序与数据流

  • 所有的Flink程序都是由三部分组成: source, Transformation 和 Sink

流处理API

  • map/flatMap/filter
  • keyBy
  • 聚合操作:
    • 滚动聚合算子 (Rolling Aggregation)
      • sum
      • min
      • max
      • minBy
      • maxBy
    • reduce:
      • 合并当前的元素和上次聚合的结构, 产生一个新的值,返回的流中包含每一次聚合的结果,而不是 只返回最后一次聚合的最终结果。
  • split 和 select
  • connect 和 coMap/coFlatMap
  • Union

UDF

  • MapFunction, FilterFunction, ProcessFunction等
  • Lambda Functions
  • Rich Functions

Source

  • 定义数据源
  • 预定义:
    • 文件: readTextFile, readFile
    • 集合: fromElements, fromCollection, fromParallelCollection, generateSequence
    • Socket: socketTextStream
  • connectors: RabbitMQ/kafka, Apache Bahir
  • 自定义source:
    • 直接或间接实现SourceFunction接口

Sink

  • 对外的输出通过sink来完成
  • 预定义:
    • 文件: writeAsText(), writeAsCsv(), writeUsingOutputFormat, FileOutputFormat
    • socket: writeToSocket
    • console: print, printToErr
  • connectors: hdfs/RabbitMQ/kafka/redis/es/nifi, Apache Bahir
  • 自定义sink:
    • 直接或者间接实现SinkFunction接口

窗口

  • 分类:
    • CountWindow: 按照指定的数据条数生成一个 Window,与时间无关。
    • TimeWindow: 按照时间生成 Window。
  • 窗口分配器: window()方法, 必须在keyBy之后才能使用.

TimeWindow

滚动窗口(Tumbling Windows)

  • 概述: 将数据依据固定的窗口长度对数据进行切片.Flink的默认时间窗口, 根据Processing Time进行窗口划分.
  • 特点: 时间对齐, 窗口长度固定, 没有重叠.
  • 适用场景: 适合做 BI 统计等(做每个时间段的聚合计算).
  • API
    • timeWindow(window_size)

滑动窗口(Sliding Windows)

  • 概述: 滑动窗口是固定窗口的更广义的一种形式, 滑动窗口由固定的窗口长度和滑动间隔组成.
  • 特点: 窗口长度固定, 可以有重叠.
  • 适用场景:对最近一个时间段内的统计 (求某接口最近 5min 的失败率来决定是否要报警).
  • API
    • timeWindow(window_size, sliding_size)

会话窗口(Session Windows)

  • 概述: 由一系列事件组合一个指定时间长度的 timeout 间隙组成, 类似于 web 应用的session, 也就是一段时间没有接收到新数据就会生成新的窗口.
  • 特征: 时间无对齐

CountWindow

  • 按照指定的数据条数生成一个 Window, 与时间无关.

滚动窗口

  • 概述: 默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
  • API
    • countWindow(window_size)

滑动窗口

  • API
    • countWindow(window_size, sliding_size)

window function

类别

  • 增量聚合函数(incremental aggregation functions)
    • 每条数据到来就进行计算,保持一个简单的状态。
    • 典型的增量聚合函数有 ReduceFunction, AggregateFunction。
  • 全窗口函数(full window functions)
    • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。
    • ProcessWindowFunction 就是一个全窗口函数。

其他

  • trigger() —— 触发器
    • 定义 window 什么时候关闭,触发计算并输出结果
  • evitor() —— 移除器
    • 定义移除某些数据的逻辑
  • allowedLateness() —— 允许处理迟到的数据
  • sideOutputLateData() —— 将迟到的数据放入侧输出流
  • getSideOutput() —— 获取侧输出流

时间语义与Watermark

时间语义

  • Event Time: 是事件创建的时间.它通常由事件中的时间戳描述, 例如采集的日志数据中, 每一条日志都会记录自己的生成时间, Flink 通过时间戳分配器访问事件时间戳.
  • Ingestion Time: 是数据进入 Flink 的时间.
  • Processing Time: 是每一个执行基于时间操作的算子的本地系统时间, 与机器相关, 默认的时间属性就是 Processing Time.

Watermark

  • 作用:
    • 用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
    • 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口

Timestamp 分配和 Watermark生成

  • 方式:
    • 在SourceFunction中, 通过collectWithTimestamp方法发送一条数据或者emitWatermark去产生一条watermark (表示接下来不会再有时间戳小于等于这个数值记录)
    • 调用DataStream.assiginTimestampAndWatermarks

Watermark 传播

  • 策略
    1. watermark会以广播的形式在算子之间传播
    2. 若接受到Long.MAX_VALUE这个数值的watermark, 相当于是终止标志
    3. 对于有多个输入的算子watermark原则是: 单输入取其大, 多输入取小 (类似木桶效应)
  • 特点:
    1. 幂等
  • 不足:
    • 时钟不同步将会带来性能开销, TimeService设置过期

Table API 与 SQL

  • Flink 对批处理和流处理,提供了统一的上层 API
  • Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观 的方式组合来自一些关系运算符的查询
  • Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite (SQL 解析工具)

DataStream 与 Table 互转

// DataStream 转为 Table
tableEnv.fromDataStream(dataStream)

// Table 转为 DataStream (需要指定类型) 追加模式
tableEnv.toAppendStream[Row](resultTable)
// Table 转为 DataStream (需要指定类型) 撤回模式: 聚合操作后的方式
tableEnv .toRetractStream[(String, Long)](aggResultTable)

SQL client

  • 当前的SQL客户端仅支持嵌入式模式
# 启动, 暂不支持远程连接
./bin/sql-client.sh embedded
  • 简介:
    • CEP 的意思是复杂事件处理,例如:起床–>洗漱–>吃饭–>上班等一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了.
  • 应用场景
    • 风险控制: 对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
    • 策略营销: 用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
    • 运维监控: 灵活配置多指标、多依赖来实现更复杂的监控模式。

原理

  • Flink CEP 内部是用 NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。
  • 点分为起始状态、中间状态、最终状态三种.
  • 边分为 take、ignore、proceed 三种.
    • take:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
    • ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
    • proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态.
  • img

组件

  1. Event Stream
  2. pattern 定义
  3. pattern 检测

参考