如何产生不规则的水位线

有时候输入流中会包含一些用于指示系统进度的特殊元组或标记。Flink为此类情形以及可根据输入元素生成水位线的情形提供了AssignerWithPunctuatedWatermarks接口。该接口中的checkAndGetNextWatermark()方法会在针对每个事件的extractTimestamp()方法后立即调用。它可以决定是否生成一个新的水位线。如果该方法返回一个非空、且大于之前值的水位线,算子就会将这个新水位线发出。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
  val bound = 60 * 1000

  // 每来一条数据就调用一次
  // 紧跟`extractTimestamp`函数调用
  override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long) {
    if (r.id == "sensor_1") {
      // 抽取的时间戳 - 最大延迟时间
      new Watermark(extractedTS - bound)
    } else {
      null
    }
  }

  // 每来一条数据就调用一次
  override def extractTimestamp(r: SensorReading, previousTS: Long) {
    r.timestamp
  }
}

现在我们已经知道如何使用 TimestampAssigner 来产生水位线了。现在我们要讨论一下水位线会对我们的程序产生什么样的影响。

水位线用来平衡延迟和计算结果的正确性。水位线告诉我们,在触发计算(例如关闭窗口并触发窗口计算)之前,我们需要等待事件多长时间。基于事件时间的操作符根据水位线来衡量系统的逻辑时间的进度。

完美的水位线永远不会错:时间戳小于水位线的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水位线。启发式水位线则相反,它只估计时间,因此有可能出错,即迟到的事件(其时间戳小于水位线标记时间)晚于水位线出现。针对启发式水位线,Flink提供了处理迟到元素的机制。

设定水位线通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过5秒,就可以将水位线标记时间设为收到的最大时间戳减去5秒。另一种做法是,采用一个Flink作业监控事件流,学习事件的迟到规律,并以此构建水位线生成模型。

如果最大延迟时间设置的很大,计算出的结果会更精确,但收到计算结果的速度会很慢,同时系统会缓存大量的数据,并对系统造成比较大的压力。如果最大延迟时间设置的很小,那么收到计算结果的速度会很快,但可能收到错误的计算结果。不过Flink处理迟到数据的机制可以解决这个问题。上述问题看起来很复杂,但是恰恰符合现实世界的规律:大部分真实的事件流都是乱序的,并且通常无法了解它们的乱序程度(因为理论上不能预见未来)。水位线是唯一让我们直面乱序事件流并保证正确性的机制; 否则只能选择忽视事实,假装错误的结果是正确的。

  • 思考题一:实时程序,要求实时性非常高,并且结果并不一定要求非常准确,那么应该怎么办?
  • 回答:直接使用处理时间。
  • 思考题二:如果要进行时间旅行,也就是要还原以前的数据集当时的流的状态,应该怎么办?
  • 回答:使用事件时间。使用Hive将数据集先按照时间戳升序排列,再将最大延迟时间设置为0。