代码练习(以分组滚动窗口为例)
我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一个滚动窗口,统计10秒内出现的每个sensor的个数。
代码如下:
scala version
object TumblingWindowTempCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
    val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))
    val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
    // table api
    val tableResult = table
      .window(Tumble over 10.seconds() on $"pt" as $"w")
      .groupBy($"id", $"w") // .keyBy(r => r.id).timeWindow(Time.seconds(10))
      .select($"id", $"id".count())
    tableEnv.toRetractStream[Row](tableResult).print()
    // sql
    tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
    val sqlResult = tableEnv
      .sqlQuery("SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)")
    tableEnv.toRetractStream[Row](sqlResult).print()
    env.execute()
  }
}
java version