分组窗口

分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。

Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

val table = input
  .window([w: GroupWindow] as $"w") // 定义窗口,别名 w
  .groupBy($"w", $"a")  // 以属性a和窗口w作为分组的key
  .select($"a", $"b".sum)  // 聚合字段b的值,求和

或者,还可以把窗口的相关信息,作为字段添加到结果表中:

val table = input
  .window([w: GroupWindow] as $"w")
  .groupBy($"w", $"a")
  .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count)

Table API提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作。

Table API支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)。

滚动窗口

滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法:

  • over:定义窗口长度
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

代码如下:

// Tumbling Event-time Window(事件时间字段rowtime
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing-time Window(处理时间字段proctime)
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10行一组)
.window(Tumble over 10.rows on $"proctime" as $"w")

滑动窗口

滑动窗口(Sliding windows)要用Slide类来定义,另外还有四个方法:

  • over:定义窗口长度
  • every:定义滑动步长
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

代码如下:

// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row-count window
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")

会话窗口

会话窗口(Session windows)要用Session类来定义,另外还有三个方法:

  • withGap:会话时间间隔
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

代码如下:

// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing-time Window
.window(Session withGap 10.minutes on $"proctime" as $"w")