只使用Flink SQL实现TopN需求
代码
scala version
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object HotItemsSQL {
case class UserBehavior(userId: String, itemId: String, categoryId: String, behavior: String, timestamp: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val stream = env
.readTextFile("UserBehavior.csv")
.map(line => {
val arr = line.split(",")
UserBehavior(arr(0), arr(1), arr(2), arr(3), arr(4).toLong * 1000L)
})
.filter(r => r.behavior.equals("pv"))
.assignAscendingTimestamps(_.timestamp)
// stream => dynamic table
tableEnv.createTemporaryView("t", stream, $"itemId", $"timestamp".rowtime() as "ts")
val result = tableEnv
.sqlQuery(
"""
|SELECT *
|FROM (
| SELECT *, ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY itemCount DESC) as row_num
| FROM (SELECT itemId, COUNT(itemId) as itemCount, HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as windowEnd
| FROM t GROUP BY HOP(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR), itemId)
|)
|WHERE row_num <= 3
|""".stripMargin)
tableEnv.toRetractStream[Row](result).print()
env.execute()
}
}
java version
public class HotItemsSQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
DataStream<UserBehavior> stream = env
.readTextFile("/home/zuoyuan/flink-tutorial/flink-scala-code/src/main/resources/UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] arr = s.split(",");
return new UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000L);
}
})
.filter(r -> r.behavior.equals("pv"))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior userBehavior, long l) {
return userBehavior.timestamp;
}
})
);
tableEnv.createTemporaryView("t", stream, $("itemId"), $("timestamp").rowtime().as("ts"));
Table result = tableEnv
.sqlQuery("SELECT *" +
" FROM (" +
" SELECT *, ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY itemCount DESC) as row_num" +
" FROM (SELECT itemId, COUNT(itemId) as itemCount, HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as windowEnd" +
" FROM t GROUP BY HOP(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR), itemId)" +
") WHERE row_num <= 3");
tableEnv.toRetractStream(result, TypeInformation.of(Row.class)).print();
env.execute();
}
}