指定时间戳和产生水位线

如果使用事件时间,那么流中的事件必须包含这个事件真正发生的时间。使用了事件时间的流必须携带水位线。

时间戳和水位线的单位是毫秒,记时从1970-01-01T00:00:00Z开始。到达某个操作符的水位线就会告知这个操作符:小于等于水位线中携带的时间戳的事件都已经到达这个操作符了。时间戳和水位线可以由SourceFunction产生,或者由用户自定义的时间戳分配器和水位线产生器来生成。

Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。一般来说,时间戳分配器需要在source操作符后马上进行调用。

因为时间戳分配器看到的元素的顺序应该和source操作符产生数据的顺序是一样的,否则就乱了。这就是为什么我们经常将source操作符的并行度设置为1的原因。

也就是说,任何分区操作都会将元素的顺序打乱,例如:并行度改变,keyBy()操作等等。

所以最佳实践是:在尽量接近数据源source操作符的地方分配时间戳和产生水位线,甚至最好在SourceFunction中分配时间戳和产生水位线。当然在分配时间戳和产生水位线之前可以对流进行map和filter操作是没问题的,也就是说必须是窄依赖。

以下这种写法是可以的。

DataStream<T> stream = env
  .addSource(...)
  .map(...)
  .filter(...)
  .assignTimestampsAndWatermarks(...)

下面的例子展示了首先filter流,然后再分配时间戳和水位线。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;

// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<SensorReading> readings = env
  .addSource(new SensorSource)
  .filter(r -> r.temperature > 25)
  .assignTimestampsAndWatermarks(new MyAssigner());

MyAssigner有两种类型

  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks

以上两个接口都继承自TimestampAssigner。