时间特性
基于时间的操作(比如Table API和SQL中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。
时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
处理时间
处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成watermark。
定义处理时间属性有三种方法:在DataStream转化时直接指定;在定义Table Schema时指定;在创建表的DDL中指定。
- DataStream转化成Table时指定
由DataStream转换成表时,可以在后面指定字段名来定义Schema。在定义Schema期间,可以使用.proctime,定义处理时间字段。
注意,这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它。
代码如下:
val stream = env.addSource(new SensorSource)
val sensorTable = tableEnv
.fromDataStream(stream, $"id", $"timestamp", $"temperature", $"pt".proctime())
- 创建表的DDL中指定
在创建表的DDL中,增加一个字段并指定成proctime,也可以指定当前的时间字段。
代码如下:
val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| pt AS PROCTIME()
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = 'sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL) // 执行 DDL
注意:运行这段DDL,必须使用Blink Planner。
事件时间(Event Time)
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
为了处理无序事件,并区分流中的准时和迟到事件;Flink需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
- DataStream转化成Table时指定
在DataStream转换成Table,schema的定义期间,使用.rowtime可以定义事件时间属性。注意,必须在转换的数据流中分配时间戳和watermark。
在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime字段名是否存在于数据流的架构中,timestamp字段可以:
- 作为新字段追加到schema
- 替换现有字段
在这两种情况下,定义的事件时间戳字段,都将保存DataStream中事件时间戳的值。
代码如下:
val stream = env
.addSource(new SensorSource)
.assignAscendingTimestamps(r => r.timestamp)
// 将 DataStream转换为 Table,并指定时间字段
val sensorTable = tableEnv
.fromDataStream(stream, $"id", $"timestamp".rowtime(), 'temperature)
- 创建表的DDL中指定
事件时间属性,是使用CREATE TABLE DDL中的WARDMARK语句定义的。watermark语句,定义现有事件时间字段上的watermark生成表达式,该表达式将事件时间字段标记为事件时间属性。
代码如下:
val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
| watermark for rt as rt - interval '1' second
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = 'file:///D:\\..\\sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL) // 执行 DDL
这里FROM_UNIXTIME是系统内置的时间函数,用来将一个整数(秒数)转换成“YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个String参数传入)的日期时间字符串(date time string);然后再用TO_TIMESTAMP将其转换成Timestamp。