APP不分渠道数据统计
完整代码如下:
package com.atguigu
import com.atguigu.AppMarketingByChannel.SimulatedEventSource
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AppMarketingStatistics {
def main(args: Array[String]): Unit = {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env
.addSource(new SimulatedEventSource)
.assignAscendingTimestamps(_.ts)
.filter(_.behavior != "UNINSTALL")
.map(r => {
("dummyKey", 1L)
})
.keyBy(_._1)
.timeWindow(Time.seconds(5), Time.seconds(1))
.process(new MarketingCountTotal)
stream.print()
env.execute()
}
class MarketingCountTotal
extends ProcessWindowFunction[(String, Long),
(String, Long, Long), String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Long)],
out: Collector[(String, Long, Long)]): Unit = {
out.collect((key, elements.size, context.window.getEnd))
}
}
}