Flink Window Trigger 使用的正确姿势

废话

  • 在使用 Flink DataStream API 进行流式数据聚合统计时, sink操作时, 都会用到 Trigger.
  • 以 Flink 1.11.x 版本为例, 查看 Trigger 的实现类主要有以下常用的 7 种:
    • EventTimeTrigger: 事件时间触发器
    • ProcessTimeTrigger: 处理时间触发器
    • ContinuousEventTimeTrigger: 连续的事件时间触发器
    • ContinuousProcessingTimeTrigger: 连续的处理时间触发器
    • CountTrigger: 窗口数据条数触发器
    • DeltaTrigger: 窗口Delta指标触发器
    • PurgingTrigger: 窗口清除触发器
  • 在实际的业务场景中, 使用最多的当属 ContinuousEventTimeTrigger 这个类了.

从一个数据处理延迟告警说起

  • 线上稳定运行了将近两周的 Job, 突然收到告警, 出现了半小时以上的数据处理延迟. 立即查看 Job 的 UI 及监控指标页面, 发现 checkpoint 出现了超时, 定位原因是两周前重新划分了渠道维表大类, 某个渠道类别 keyby 聚合算子因存在数据热点问题, TaskManager 出现了性能处理瓶颈, 导致 Kafka Source 端出现了严重的反压, 数据处理出现了延迟.

解决过程

  • 处理原则就是尽量的打散数据聚合的 key, 均匀的落在每个 Task 上.
  • 套用以往 SparkStreaming 优化的经验来看还是十分的简单, 于是使用 map 算子增加随机的 key, 再 reduce 做预聚合, 最后再按窗口来进行汇总. 不出半小时改完上预发布环境测试, 查看了下数据流一切正常后上线生产, 重跑当日数据. 观察了当天上午每小时的大屏数据折线趋势图, 相比昨天大概有 5% 左右的涨幅, 在有活动的情况下看起来还算正常.

发现问题

  • 下午再次查看大屏数据, 当日的累计成交订单数已经超过前两天之和; 按小时核对明细层数据, 没发现问题, 再看报表聚合层逻辑, 定位问题出现在预聚合计算逻辑.

验证预聚合逻辑

  • 为了验证这个问题, 启动本地 Docker kafka 并灌入乱序数据, 来验证预聚合逻辑.

预聚合逻辑

  • 需求: 按天统计渠道大类的下单数/GMV/用户数, 每秒进行更新
  • 此处简化了生产聚合逻辑, 模拟按天开窗设置成按分钟开窗, 便于观察数据聚合情况, 即按分钟统计大类的下单数, 每秒更新.
  • 以下为 scala 代码
object TriggerTest extends App {

  val (env, tableEnv) = FlinkUtil.initStreamEnv()

  val props = new Properties()
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "com_patrick_stream_test")

  val kafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String]("goods_cat", new SimpleStringSchema(), props)
  kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
  kafkaConsumer.setStartFromLatest() // 此处为了便于测试, 每次从最新的 offset 消费, 线上环境可以从指定时间消费

  val goodsCatDS: DataStream[GoodsCat] = env
    .addSource(kafkaConsumer)
    .map(item => {
      val strArr: Array[String] = item.split(",")
      GoodsCat(strArr(0).trim, strArr(1).trim.toInt, DateUtil.formatDateToTimestamp(strArr(2).trim) * 1000)
    })
    .assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[GoodsCat](Duration.ofSeconds(20))
        .withTimestampAssigner(new SerializableTimestampAssigner[GoodsCat] {
          override def extractTimestamp(element: GoodsCat, recordTimestamp: Long): Long = {
            element.ts
          }
        })
    )

  goodsCatDS.print("source-goods-cat").setParallelism(1)

  val firstGoodsCatAggDS: DataStream[GoodsCatTmp] = goodsCatDS
    .map(item => GoodsCatTmp(DateUtil.formatKeyDayHourMinute(item.ts), item.catId, item.cnt, item.ts, Random.nextInt(3)))
    .keyBy(item => (item.catId, item.pKey))
    .timeWindow(Time.minutes(1))
    .trigger(ContinuousEventTimeTrigger.of[TimeWindow](Time.seconds(1)))
    .reduce(new ReduceFunction[GoodsCatTmp] {
      override def reduce(value1: GoodsCatTmp, value2: GoodsCatTmp): GoodsCatTmp = GoodsCatTmp(
        value1.periodId, value1.catId, value1.cnt + value2.cnt, Math.max(value1.ts, value2.ts), value1.pKey
      )
    })

  firstGoodsCatAggDS.print("cat-first-agg").setParallelism(1)

  val secondGoodsCatAggDS: DataStream[GoodsCatTmp] = firstGoodsCatAggDS
    .keyBy(item => (item.periodId, item.catId))
    .reduce(new ReduceFunction[GoodsCatTmp] {
      override def reduce(value1: GoodsCatTmp, value2: GoodsCatTmp): GoodsCatTmp = GoodsCatTmp(
        value1.periodId, value1.catId, value1.cnt + value2.cnt, Math.max(value1.ts, value2.ts), value1.pKey
      )
    })

  secondGoodsCatAggDS.print("cat-second-agg").setParallelism(1)

  env.execute("trigger-test")
}

case class GoodsCat(
                     catId: String,
                     cnt: Int,
                     ts: Long
                   )

case class GoodsCatTmp(
                        periodId: String, // yyyyMMddHHmm 格式的分钟 key
                        catId: String,
                        cnt: Int,
                        ts: Long,
                        pKey: Int
                      )

运行测试

  • 本地 kafka 灌入乱序测试数据:
# 商品类别, 订单数, 时间
2,1,2021-10-24 20:00:10
3,1,2021-10-24 20:00:29
1,1,2021-10-24 20:00:18
2,1,2021-10-24 20:00:46
1,1,2021-10-24 20:01:08
2,1,2021-10-24 20:00:55
3,1,2021-10-24 20:01:27
1,1,2021-10-24 20:01:39
2,1,2021-10-24 20:01:51
3,1,2021-10-24 20:02:03
1,1,2021-10-24 20:01:57
2,1,2021-10-24 20:02:26
3,1,2021-10-24 20:02:41
1,1,2021-10-24 20:02:53
2,1,2021-10-24 20:03:04
2,1,2021-10-24 20:02:58
2,1,2021-10-24 20:03:32
3,1,2021-10-24 20:03:57

运行结果

source-goods-cat> GoodsCat(2,1,1635076810000)
source-goods-cat> GoodsCat(3,1,1635076829000)
source-goods-cat> GoodsCat(1,1,1635076818000)
source-goods-cat> GoodsCat(2,1,1635076846000)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,2,2,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,4,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,6,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,8,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,10,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,12,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,14,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,16,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,18,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,20,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,22,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,24,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,26,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,28,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,2,30,1635076846000,1)
cat-second-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,2,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,3,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,4,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,5,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,6,1635076818000,0)
cat-second-agg> GoodsCatTmp(202110242000,1,7,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,0)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,1,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,2,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,3,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,4,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,5,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,6,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,7,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,8,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,9,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,10,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,11,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,12,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,13,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,14,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,15,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,16,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,17,1635076829000,1)
cat-second-agg> GoodsCatTmp(202110242000,3,18,1635076829000,1)
...

排查

  • 很明显, 在 2021-10-24 20:00:00 到 2021-10-24 20:00:01 这个分钟窗口内, 预聚合 (biz-first-stat) 打印中出现了多次相同的数据, 以至于结果层 (biz-second-stat), 类别 1 的订单数为 7, 而正常为1; 类别 2 的订单数为 30, 而正常为 3; 类别 3 的订单数为 18, 而正常为 1.
  • 因此可以肯定是 ContinuousEventTimeTrigger 出现了触发相同结果的数据, 查看源码该类的注释说明
A Trigger that continuously fires based on a given time interval. This fires based on Watermarks.
See Also:
org.apache.flink.streaming.api.watermark.Watermark
Type parameters:
<W> – The type of Windows on which this trigger can operate.
  • 按注释理解一下, 就是按给定时间间隔持续触发计算, 该触发是基于水位线的. 那么问题很可能就出现持续上, 继续翻看源码, 重点关注 onElement, onEventTime, onMerge仅在两个触发器的相应窗口合并时才会调用, 可以忽略.
  /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
  
  // 每一个数据进入窗口都会触发
  @Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
      // 当前的水位线已经超过窗口时间, 则触发计算, 但不清除窗口数据
			return TriggerResult.FIRE;
		} else {
      // 注册下一次窗口触发的定时器
			ctx.registerEventTimeTimer(window.maxTimestamp());
		}

    // 获取下次触发窗口计算时间
		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
		if (fireTimestamp.get() == null) {
			long start = timestamp - (timestamp % interval);
			long nextFireTimestamp = start + interval;
      // 注册下一次窗口触发的定时器
			ctx.registerEventTimeTimer(nextFireTimestamp);
			fireTimestamp.add(nextFireTimestamp);
		}
    // 不做操作
		return TriggerResult.CONTINUE;
	}

  // 在 EventTime 定时器触发的时候调用
	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

		if (time == window.maxTimestamp()){
      // 定时器的时间与窗口最大时间相等, 触发计算, 但不清空窗口数据
			return TriggerResult.FIRE;
		}

		ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

		Long fireTimestamp = fireTimestampState.get();

		if (fireTimestamp != null && fireTimestamp == time) {
      // 定时器的时间与触发器状态中的时间相等时, 清除并更新下一次触发时间, 注册下一个定时器, 并触发计算, 但不清空窗口数据
			fireTimestampState.clear();
			fireTimestampState.add(time + interval);
			ctx.registerEventTimeTimer(time + interval);
			return TriggerResult.FIRE;
		}
    // 不做操作
		return TriggerResult.CONTINUE;
	}

  private static class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
          log.debug("Min ===> value1={}, value2={}", value1, value2);
          return Math.min(value1, value2);
      }
  }

自定义 Trigger 实现

  • 那么如何解决这种预聚合过程中的 Trigger 输出重复的预聚合数据, 最直接的办法就是在结果层进行去重. 有没有更好的办法呢? 思考再三, 于是有了下面的自定义 Trigger

解决思路

  • 因结果层会对预聚合层再次累加, 同时也不存在窗口合并的情况, 不需要实现 onMerge 方法, 因此在预聚合层触发计算后可以直接清除本次窗口数据.
class IntervalEventTimeTrigger(interval: Long) extends Trigger[Any, TimeWindow]{

  private final val log: Logger = LoggerFactory.getLogger(this.getClass)

  private final val fireTimeStateDesc = new ValueStateDescriptor[Long]("fireTimeState", classOf[Long])

  override def onElement(element: Any, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

    if (window.maxTimestamp() <= ctx.getCurrentWatermark) {
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE
    } else {
      ctx.registerEventTimeTimer(window.maxTimestamp())
    }

    val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
    val fireTime: Long = fireTimestamp.value()

    if (fireTime == 0L) {
      val start: Long = timestamp - (timestamp % interval)
      val nextFireTimestamp: Long = start + interval
      fireTimestamp.update(nextFireTimestamp)
      ctx.registerEventTimeTimer(nextFireTimestamp)
    }

    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long,
                                window: TimeWindow,
                                ctx: Trigger.TriggerContext
                               ): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

    if (time == window.maxTimestamp()) {
      return TriggerResult.FIRE_AND_PURGE
    }

    val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
    val timestamp: Long = fireTimestamp.value()

    if (timestamp == time) {
      fireTimestamp.clear()
      fireTimestamp.update(time + interval)
      ctx.registerEventTimeTimer(time + interval)
      return TriggerResult.FIRE_AND_PURGE
    }

    TriggerResult.CONTINUE
  }

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {

    val fireTimestamp: ValueState[Long] = ctx.getPartitionedState(fireTimeStateDesc)
    val timestamp: Long = fireTimestamp.value()
    if (timestamp != 0) {
      ctx.deleteEventTimeTimer(timestamp)
      fireTimestamp.clear()
    }
  }
}

object IntervalEventTimeTrigger{

  def of(interval: Time): IntervalEventTimeTrigger = {
    new IntervalEventTimeTrigger(interval.toMilliseconds)
  }
}

测试及运行结果

  • 依然重新灌入相同的乱序数据
cat-first-agg> GoodsCatTmp(202110242000,2,2,1635076846000,2)
cat-first-agg> GoodsCatTmp(202110242000,1,1,1635076818000,2)
cat-first-agg> GoodsCatTmp(202110242000,3,1,1635076829000,2)
cat-first-agg> GoodsCatTmp(202110242000,2,1,1635076855000,2)

cat-second-agg> GoodsCatTmp(202110242000,2,2,1635076846000,2)
cat-second-agg> GoodsCatTmp(202110242000,1,1,1635076818000,2)
cat-second-agg> GoodsCatTmp(202110242000,3,1,1635076829000,2)
cat-second-agg> GoodsCatTmp(202110242000,2,3,1635076855000,2)

cat-first-agg> GoodsCatTmp(202110242001,1,2,1635076899000,1)
cat-first-agg> GoodsCatTmp(202110242001,3,1,1635076887000,1)
cat-first-agg> GoodsCatTmp(202110242001,1,1,1635076917000,2)
cat-first-agg> GoodsCatTmp(202110242001,2,1,1635076911000,1)

cat-second-agg> GoodsCatTmp(202110242001,1,2,1635076899000,1)
cat-second-agg> GoodsCatTmp(202110242001,3,1,1635076887000,1)
cat-second-agg> GoodsCatTmp(202110242001,1,3,1635076917000,1)
cat-second-agg> GoodsCatTmp(202110242001,2,1,1635076911000,1)
  • 可以看到在 2021-10-24 20:00:00 到 2021-10-24 20:00:01 这个窗口内, 最终得到正确的结果: (1,1), (2,3), (3,1); 在 2021-10-24 20:00:01 到 2021-10-24 20:00:02 这个窗口内, 最终得到正确的结果: (1,3), (2,1), (3,1); 同时也正确的处理了乱序数据, 结果符合预期.

总结

关于 ContinuousEventTimeTrigger 的使用

  1. 当每个事件时间的间隔大于定期触发的间隔时, 将会存在多次输出相同结果的情况
    • 次数: (下一个事件时间 - 上一个时间时间 - 1) / 定时器时间间隔, 结果取整
  2. 窗口每个 key 的触发时间会出现不一致的情况, 当有新的数据进来时, 不论该 key 的数据是否存在更新, 会按定时触发的时间间隔输出该 key 的数据
  3. 当作为结果层触发输出时, 因结果层一般会采用幂等的方式保存结果数据, 即 upsert 方式, 不会存在重复统计的情况; 而作为中间结果层将会出现重复数据, 结果层需要对其做去重处理

参考