Flink Window Trigger 使用的正确姿势
废话
- 在使用 Flink DataStream API 进行流式数据聚合统计时, sink操作时, 都会用到 Trigger.
- 以 Flink 1.11.x 版本为例, 查看 Trigger 的实现类主要有以下常用的 7 种:
EventTimeTrigger
: 事件时间触发器ProcessTimeTrigger
: 处理时间触发器ContinuousEventTimeTrigger
: 连续的事件时间触发器ContinuousProcessingTimeTrigger
: 连续的处理时间触发器CountTrigger
: 窗口数据条数触发器DeltaTrigger
: 窗口Delta指标触发器PurgingTrigger
: 窗口清除触发器
- 在实际的业务场景中, 使用最多的当属 ContinuousEventTimeTrigger 这个类了.
从一个数据处理延迟告警说起
- 线上稳定运行了将近两周的 Job, 突然收到告警, 出现了半小时以上的数据处理延迟. 立即查看 Job 的 UI 及监控指标页面, 发现 checkpoint 出现了超时, 定位原因是两周前重新划分了渠道维表大类, 某个渠道类别 keyby 聚合算子因存在数据热点问题, TaskManager 出现了性能处理瓶颈, 导致 Kafka Source 端出现了严重的反压, 数据处理出现了延迟.
解决过程
- 处理原则就是尽量的打散数据聚合的 key, 均匀的落在每个 Task 上.
- 套用以往 SparkStreaming 优化的经验来看还是十分的简单, 于是使用 map 算子增加随机的 key, 再 reduce 做预聚合, 最后再按窗口来进行汇总. 不出半小时改完上预发布环境测试, 查看了下数据流一切正常后上线生产, 重跑当日数据. 观察了当天上午每小时的大屏数据折线趋势图, 相比昨天大概有 5% 左右的涨幅, 在有活动的情况下看起来还算正常.
发现问题
- 下午再次查看大屏数据, 当日的累计成交订单数已经超过前两天之和; 按小时核对明细层数据, 没发现问题, 再看报表聚合层逻辑, 定位问题出现在预聚合计算逻辑.
验证预聚合逻辑
- 为了验证这个问题, 启动本地 Docker kafka 并灌入乱序数据, 来验证预聚合逻辑.
预聚合逻辑
- 需求: 按天统计渠道大类的下单数/GMV/用户数, 每秒进行更新
- 此处简化了生产聚合逻辑, 模拟按天开窗设置成按分钟开窗, 便于观察数据聚合情况, 即按分钟统计大类的下单数, 每秒更新.
- 以下为 scala 代码
object TriggerTest extends App {
val (env, tableEnv) = FlinkUtil.initStreamEnv()
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "com_patrick_stream_test")
val kafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String]("goods_cat", new SimpleStringSchema(), props)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromLatest() // 此处为了便于测试, 每次从最新的 offset 消费, 线上环境可以从指定时间消费
val goodsCatDS: DataStream[GoodsCat] = env
.addSource(kafkaConsumer)
.map(item => {
val strArr: Array[String] = item.split(",")
GoodsCat(strArr(0).trim, strArr(1).trim.toInt, DateUtil.formatDateToTimestamp(strArr(2).trim) * 1000)
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[GoodsCat](Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner[GoodsCat] {
override def extractTimestamp(element: GoodsCat, recordTimestamp: Long): Long = {
element.ts
}
})
)
goodsCatDS.print("source-goods-cat").setParallelism(1)
val firstGoodsCatAggDS: DataStream[GoodsCatTmp] = goodsCatDS
.map(item => GoodsCatTmp(DateUtil.formatKeyDayHourMinute(item.ts), item.catId, item.cnt, item.ts, Random.nextInt(3)))
.keyBy(item => (item.catId, item.pKey))
.timeWindow(Time.minutes(1))
.trigger(ContinuousEventTimeTrigger.of[TimeWindow](Time.seconds(1)))
.reduce(new ReduceFunction[GoodsCatTmp] {
override def reduce(value1: GoodsCatTmp, value2: GoodsCatTmp): GoodsCatTmp = GoodsCatTmp(
value1.periodId, value1.catId, value1.cnt + value2.cnt, Math.max(value1.ts, value2.ts), value1.pKey
)
})
firstGoodsCatAggDS.print("cat-first-agg").setParallelism(1)
val secondGoodsCatAggDS: DataStream[GoodsCatTmp] = firstGoodsCatAggDS
.keyBy(item => (item.periodId, item.catId))
.reduce(new ReduceFunction[GoodsCatTmp] {
override def reduce(value1: GoodsCatTmp, value2: GoodsCatTmp): GoodsCatTmp = GoodsCatTmp(
value1.periodId, value1.catId, value1.cnt + value2.cnt, Math.max(value1.ts, value2.ts), value1.pKey
)
})
secondGoodsCatAggDS.print("cat-second-agg").setParallelism(1)
env.execute("trigger-test")
}
case class GoodsCat(
catId: String,
cnt: Int,
ts: Long
)
case class GoodsCatTmp(
periodId: String, // yyyyMMddHHmm 格式的分钟 key
catId: String,
cnt: Int,
ts: Long,
pKey: Int
)
运行测试
- 本地 kafka 灌入乱序测试数据:
# 商品类别, 订单数, 时间
2,1,2021-10-24 20:00:10
3,1,2021-10-24 20:00:29
1,1,2021-10-24 20:00:18
2,1,2021-10-24 20:00:46
1,1,2021-10-24 20:01:08
2,1,2021-10-24 20:00:55
3,1,2021-10-24 20:01:27
1,1,2021-10-24 20:01:39
2,1,2021-10-24 20:01:51
3,1,2021-10-24 20:02:03
1,1,2021-10-24 20:01:57
2,1,2021-10-24 20:02:26
3,1,2021-10-24 20:02:41
1,1,2021-10-24 20:02:53
2,1,2021-10-24 20:03:04
2,1,2021-10-24 20:02:58
2,1,2021-10-24 20:03:32
3,1,2021-10-24 20:03:57
运行结果
source-goods-cat> GoodsCat(2,1,1635076810000)
source-goods-cat> GoodsCat(3,1,1635076829000)
source-goods-cat> GoodsCat(1,1,1635076818000)
source-goods-cat> GoodsCat(2,1,1635076846000)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,4,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,6,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,8,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,10,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,12,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,14,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,16,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,18,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,20,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,22,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,24,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,26,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,28,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,30,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,2,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,3,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,4,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,5,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,6,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,7,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,2,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,3,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,4,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,5,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,6,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,7,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,8,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,9,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,10,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,11,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,12,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,13,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,14,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,15,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,16,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,17,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,18,1635076829000,1)
...
排查
- 很明显, 在 2021-10-24 20:00:00 到 2021-10-24 20:00:01 这个分钟窗口内, 预聚合 (biz-first-stat) 打印中出现了多次相同的数据, 以至于结果层 (biz-second-stat), 类别 1 的订单数为 7, 而正常为1; 类别 2 的订单数为 30, 而正常为 3; 类别 3 的订单数为 18, 而正常为 1.
- 因此可以肯定是 ContinuousEventTimeTrigger 出现了触发相同结果的数据, 查看源码该类的注释说明
A Trigger that continuously fires based on a given time interval. This fires based on Watermarks.
See Also:
org.apache.flink.streaming.api.watermark.Watermark
Type parameters:
<W> – The type of Windows on which this trigger can operate.
- 按注释理解一下, 就是按给定时间间隔持续触发计算, 该触发是基于水位线的. 那么问题很可能就出现持续上, 继续翻看源码, 重点关注
onElement
,onEventTime
,onMerge
仅在两个触发器的相应窗口合并时才会调用, 可以忽略.
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
// 每一个数据进入窗口都会触发
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
// 当前的水位线已经超过窗口时间, 则触发计算, 但不清除窗口数据
return TriggerResult.FIRE;
} else {
// 注册下一次窗口触发的定时器
ctx.registerEventTimeTimer(window.maxTimestamp());
}
// 获取下次触发窗口计算时间
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
// 注册下一次窗口触发的定时器
ctx.registerEventTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
}
// 不做操作
return TriggerResult.CONTINUE;
}
// 在 EventTime 定时器触发的时候调用
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()){
// 定时器的时间与窗口最大时间相等, 触发计算, 但不清空窗口数据
return TriggerResult.FIRE;
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
Long fireTimestamp = fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
// 定时器的时间与触发器状态中的时间相等时, 清除并更新下一次触发时间, 注册下一个定时器, 并触发计算, 但不清空窗口数据
fireTimestampState.clear();
fireTimestampState.add(time + interval);
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
// 不做操作
return TriggerResult.CONTINUE;
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
log.debug("Min ===> value1={}, value2={}", value1, value2);
return Math.min(value1, value2);
}
}
自定义 Trigger 实现
- 那么如何解决这种预聚合过程中的 Trigger 输出重复的预聚合数据, 最直接的办法就是在结果层进行去重. 有没有更好的办法呢? 思考再三, 于是有了下面的自定义 Trigger
解决思路
- 因结果层会对预聚合层再次累加, 同时也不存在窗口合并的情况, 不需要实现 onMerge 方法, 因此在预聚合层触发计算后可以直接清除本次窗口数据.
class IntervalEventTimeTrigger(interval: Long) extends Trigger[Any, TimeWindow]{
private final val log: Logger = LoggerFactory.getLogger(this.getClass)
private final val fireTimeStateDesc = new ValueStateDescriptor[Long]("fireTimeState", classOf[Long])
override def onElement(element: Any, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if (window.maxTimestamp() <= ctx.getCurrentWatermark) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE
} else {
ctx.registerEventTimeTimer(window.maxTimestamp())
}
val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
val fireTime: Long = fireTimestamp.value()
if (fireTime == 0L) {
val start: Long = timestamp - (timestamp % interval)
val nextFireTimestamp: Long = start + interval
fireTimestamp.update(nextFireTimestamp)
ctx.registerEventTimeTimer(nextFireTimestamp)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext
): TriggerResult = TriggerResult.CONTINUE
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE_AND_PURGE
}
val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
val timestamp: Long = fireTimestamp.value()
if (timestamp == time) {
fireTimestamp.clear()
fireTimestamp.update(time + interval)
ctx.registerEventTimeTimer(time + interval)
return TriggerResult.FIRE_AND_PURGE
}
TriggerResult.CONTINUE
}
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
val timestamp: Long = fireTimestamp.value()
if (timestamp != 0) {
ctx.deleteEventTimeTimer(timestamp)
fireTimestamp.clear()
}
}
}
object IntervalEventTimeTrigger{
def of(interval: Time): IntervalEventTimeTrigger = {
new IntervalEventTimeTrigger(interval.toMilliseconds)
}
}
测试及运行结果
- 依然重新灌入相同的乱序数据
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,2)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,2)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,2)
cat-first-agg> GoodsCatTmp(202110242000,2,1,1635076855000,2)
cat-second-agg> GoodsCatTmp(202110242000,2,2,1635076846000,2)
cat-second-agg> GoodsCatTmp(202110242000,1,1,1635076818000,2)
cat-second-agg> GoodsCatTmp(202110242000,3,1,1635076829000,2)
cat-second-agg> GoodsCatTmp(202110242000,2,3,1635076855000,2)
cat-first-agg> GoodsCatTmp(202110242001,1,2,1635076899000,1)
cat-first-agg> GoodsCatTmp(202110242001,3,1,1635076887000,1)
cat-first-agg> GoodsCatTmp(202110242001,1,1,1635076917000,2)
cat-first-agg> GoodsCatTmp(202110242001,2,1,1635076911000,1)
cat-second-agg> GoodsCatTmp(202110242001,1,2,1635076899000,1)
cat-second-agg> GoodsCatTmp(202110242001,3,1,1635076887000,1)
cat-second-agg> GoodsCatTmp(202110242001,1,3,1635076917000,1)
cat-second-agg> GoodsCatTmp(202110242001,2,1,1635076911000,1)
- 可以看到在 2021-10-24 20:00:00 到 2021-10-24 20:00:01 这个窗口内, 最终得到正确的结果: (1,1), (2,3), (3,1); 在 2021-10-24 20:00:01 到 2021-10-24 20:00:02 这个窗口内, 最终得到正确的结果: (1,3), (2,1), (3,1); 同时也正确的处理了乱序数据, 结果符合预期.
总结
关于 ContinuousEventTimeTrigger 的使用
- 当每个事件时间的间隔大于定期触发的间隔时, 将会存在多次输出相同结果的情况
- 次数: (下一个事件时间 - 上一个时间时间 - 1) / 定时器时间间隔, 结果取整
- 窗口每个 key 的触发时间会出现不一致的情况, 当有新的数据进来时, 不论该 key 的数据是否存在更新, 会按定时触发的时间间隔输出该 key 的数据
- 当作为结果层触发输出时, 因结果层一般会采用幂等的方式保存结果数据, 即 upsert 方式, 不会存在重复统计的情况; 而作为中间结果层将会出现重复数据, 结果层需要对其做去重处理