如何产生不规则的水位线
有时候输入流中会包含一些用于指示系统进度的特殊元组或标记。Flink为此类情形以及可根据输入元素生成水位线的情形提供了AssignerWithPunctuatedWatermarks
接口。该接口中的checkAndGetNextWatermark()
方法会在针对每个事件的extractTimestamp()
方法后立即调用。它可以决定是否生成一个新的水位线。如果该方法返回一个非空、且大于之前值的水位线,算子就会将这个新水位线发出。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound = 60 * 1000
// 每来一条数据就调用一次
// 紧跟`extractTimestamp`函数调用
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long) {
if (r.id == "sensor_1") {
// 抽取的时间戳 - 最大延迟时间
new Watermark(extractedTS - bound)
} else {
null
}
}
// 每来一条数据就调用一次
override def extractTimestamp(r: SensorReading, previousTS: Long) {
r.timestamp
}
}
现在我们已经知道如何使用 TimestampAssigner
来产生水位线了。现在我们要讨论一下水位线会对我们的程序产生什么样的影响。
水位线用来平衡延迟和计算结果的正确性。水位线告诉我们,在触发计算(例如关闭窗口并触发窗口计算)之前,我们需要等待事件多长时间。基于事件时间的操作符根据水位线来衡量系统的逻辑时间的进度。
完美的水位线永远不会错:时间戳小于水位线的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水位线。启发式水位线则相反,它只估计时间,因此有可能出错,即迟到的事件(其时间戳小于水位线标记时间)晚于水位线出现。针对启发式水位线,Flink提供了处理迟到元素的机制。
设定水位线通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过5秒,就可以将水位线标记时间设为收到的最大时间戳减去5秒。另一种做法是,采用一个Flink作业监控事件流,学习事件的迟到规律,并以此构建水位线生成模型。
如果最大延迟时间设置的很大,计算出的结果会更精确,但收到计算结果的速度会很慢,同时系统会缓存大量的数据,并对系统造成比较大的压力。如果最大延迟时间设置的很小,那么收到计算结果的速度会很快,但可能收到错误的计算结果。不过Flink处理迟到数据的机制可以解决这个问题。上述问题看起来很复杂,但是恰恰符合现实世界的规律:大部分真实的事件流都是乱序的,并且通常无法了解它们的乱序程度(因为理论上不能预见未来)。水位线是唯一让我们直面乱序事件流并保证正确性的机制; 否则只能选择忽视事实,假装错误的结果是正确的。
- 思考题一:实时程序,要求实时性非常高,并且结果并不一定要求非常准确,那么应该怎么办?
- 回答:直接使用处理时间。
- 思考题二:如果要进行时间旅行,也就是要还原以前的数据集当时的流的状态,应该怎么办?
- 回答:使用事件时间。使用Hive将数据集先按照时间戳升序排列,再将最大延迟时间设置为0。