代码练习(以分组滚动窗口为例)

我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一个滚动窗口,统计10秒内出现的每个sensor的个数。

代码如下:

scala version

object TumblingWindowTempCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)

    val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))

    val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())

    // table api
    val tableResult = table
      .window(Tumble over 10.seconds() on $"pt" as $"w")
      .groupBy($"id", $"w") // .keyBy(r => r.id).timeWindow(Time.seconds(10))
      .select($"id", $"id".count())

    tableEnv.toRetractStream[Row](tableResult).print()

    // sql
    tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())

    val sqlResult = tableEnv
      .sqlQuery("SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)")

    tableEnv.toRetractStream[Row](sqlResult).print()

    env.execute()
  }
}

java version