将表转换成DataStream
表可以转换为DataStream或DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API或SQL查询的结果上运行了。
将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。
表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。
Table API中表到DataStream有两种模式:
- 追加模式(Append Mode)
用于表只会被插入(Insert)操作更改的场景。
- 撤回模式(Retract Mode)
用于任何场景。有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作。
得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据,Delete)。
代码实现如下:
val resultStream: DataStream[Row] = tableEnv
.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv
.toRetractStream[(String, Long)](aggResultTable)
resultStream.print("result")
aggResultStream.print("aggResult")
所以,没有经过groupby之类聚合操作,可以直接用toAppendStream来转换;而如果经过了聚合,有更新操作,一般就必须用toRetractDstream。