代码练习(以分组滚动窗口为例)

我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一个滚动窗口,统计10秒内出现的每个sensor的个数。

代码如下:

scala version

object TumblingWindowTempCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env, settings) val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1")) val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime()) // table api val tableResult = table .window(Tumble over 10.seconds() on $"pt" as $"w") .groupBy($"id", $"w") // .keyBy(r => r.id).timeWindow(Time.seconds(10)) .select($"id", $"id".count()) tableEnv.toRetractStream[Row](tableResult).print() // sql tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime()) val sqlResult = tableEnv .sqlQuery("SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)") tableEnv.toRetractStream[Row](sqlResult).print() env.execute() } }

java version