实时热门商品统计
首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。
基本需求
- 每隔5分钟输出最近一小时内点击量最多的前N个商品
- 点击量用浏览次数("pv")来衡量
解决思路
- 在所有用户行为数据中,过滤出浏览("pv")行为进行统计
- 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟
- 窗口计算使用增量聚合函数和全窗口聚合函数相结合的方法
- 使用窗口结束时间作为key,对DataStream进行keyBy()操作
- 将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出
数据准备
将数据文件UserBehavior.csv复制到资源文件目录src/main/resources下。
程序主体
scala version
// 把数据需要ETL成UserBehavior类型
case class UserBehavior(userId: Long,
itemId: Long,
categoryId: Int,
behavior: String,
timestamp: Long)
// 全窗口聚合函数输出的数据类型
case class ItemViewCount(itemId: Long,
windowEnd: Long,
count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
// 创建一个 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
// 设定Time类型为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 为了打印到控制台的结果不乱序,
// 我们配置全局的并发为1,这里改变并发对结果正确性没有影响
env.setParallelism(1)
val stream = env
// 以window下为例,需替换成数据集的绝对路径
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong,
linearray(1).toLong,
linearray(2).toInt,
linearray(3),
linearray(4).toLong)
})
// 过滤出点击事件
.filter(_.behavior == "pv")
// 指定时间戳和Watermark,这里我们已经知道了数据集的时间戳是单调递增的了。
.assignAscendingTimestamps(_.timestamp * 1000)
// 根据商品Id分流
.keyBy(_.itemId)
// 开窗操作
.timeWindow(Time.minutes(60), Time.minutes(5))
// 窗口计算操作
.aggregate(new CountAgg(), new WindowResultFunction())
// 根据窗口结束时间分流
.keyBy(_.windowEnd)
// 求点击量前3名的商品
.process(new TopNHotItems(3))
// 打印结果
stream.print()
// 别忘了执行
env.execute("Hot Items Job")
}
}
真实业务场景一般都是乱序的,所以一般不用
assignAscendingTimestamps
,而是使用BoundedOutOfOrdernessTimestampExtractor
。
增量聚合函数逻辑编写
// COUNT统计的聚合函数实现,每出现一条记录就加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
全窗口聚合函数逻辑编写
其实就是将增量聚合的结果包上一层窗口信息和key的信息。
代码如下:
// 用于输出窗口的结果
class WindowResultFunction extends ProcessWindowFunction[Long, ItemViewCount, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[Long],
out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key, context.window.getEnd, elements.iterator.next()))
}
}
现在我们就得到了每个商品在每个窗口的点击量的数据流。
计算最热门TopN商品
class TopNHotItems(topSize: Int)
extends KeyedProcessFunction[Long, ItemViewCount, String] {
// 惰性赋值一个状态变量
lazy val itemState = getRuntimeContext.getListState(
new ListStateDescriptor[ItemViewCount]("items", Types.of[ItemViewCount])
)
// 来一条数据都会调用一次
override def processElement(value: ItemViewCount,
ctx: KeyedProcessFunction[Long,
ItemViewCount, String]#Context,
out: Collector[String]): Unit = {
itemState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
// 定时器事件
override def onTimer(
ts: Long,
ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
out: Collector[String]
): Unit = {
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
// 导入一些隐式类型转换
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 清空状态变量,释放空间
itemState.clear()
// 降序排列
val sortedItems = allItems.sortBy(-_.count).take(topSize)
val result = new StringBuilder
result.append("====================================\n")
result.append("时间: ").append(new Timestamp(ts - 1)).append("\n")
for (i <- sortedItems.indices) {
val currentItem = sortedItems(i)
result.append("No")
.append(i+1)
.append(":")
.append(" 商品ID=")
.append(currentItem.itemId)
.append(" 浏览量=")
.append(currentItem.count)
.append("\n")
}
result.append("====================================\n\n")
Thread.sleep(1000)
out.collect(result.toString())
}
}
更换Kafka作为数据源
实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:
编写代码:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.addSource(new FlinkKafkaConsumer[String](
"hotitems",
new SimpleStringSchema(),
properties)
)
当然,根据实际的需要,我们还可以将Sink指定为Kafka、ES、Redis或其它存储,这里就不一一展开实现了。
kafka生产者程序
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerUtil {
def main(args: Array[String]): Unit = {
writeToKafka("hotitems")
}
def writeToKafka(topic: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"
)
props.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"
)
val producer = new KafkaProducer[String, String](props)
val bufferedSource = io.Source.fromFile("UserBehavior.csv文件的绝对路径")
for (line <- bufferedSource.getLines) {
val record = new ProducerRecord[String, String](topic, line)
producer.send(record)
}
producer.close()
}
}
java版本程序
UserBehavior的POJO类定义
public class UserBehavior {
public String userId;
public String itemId;
public String categoryId;
public String behavior;
public Long timestamp;
public UserBehavior() {
}
public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId='" + userId + '\'' +
", itemId='" + itemId + '\'' +
", categoryId='" + categoryId + '\'' +
", behavior='" + behavior + '\'' +
", timestamp=" + timestamp +
'}';
}
}
ItemViewCount的POJO类定义
import java.sql.Timestamp;
public class ItemViewCount {
public String itemId;
public Long count;
public Long windowStart;
public Long windowEnd;
public ItemViewCount() {
}
public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {
this.itemId = itemId;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemId='" + itemId + '\'' +
", count=" + count +
", windowStart=" + new Timestamp(windowStart) +
", windowEnd=" + new Timestamp(windowEnd) +
'}';
}
}
主体程序
public class UserBehaviorAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
SingleOutputStreamOperator<UserBehavior> stream = env
.readTextFile("UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] arr = s.split(",");
return new UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000L);
}
})
.filter(r -> r.behavior.equals("pv"))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior userBehavior, long l) {
return userBehavior.timestamp;
}
})
);
stream
.keyBy(r -> r.itemId)
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResult())
.keyBy(r -> r.windowEnd)
.process(new TopN(3))
.print();
env.execute();
}
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
public static class WindowResult extends ProcessWindowFunction<Long, ItemViewCount, String, TimeWindow> {
@Override
public void process(String key, Context ctx, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
collector.collect(new ItemViewCount(key, iterable.iterator().next(), ctx.window().getStart(), ctx.window().getEnd()));
}
}
public static class TopN extends KeyedProcessFunction<Long, ItemViewCount, String> {
private ListState<ItemViewCount> itemState;
private Integer threshold;
public TopN(Integer threshold) {
this.threshold = threshold;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
itemState = getRuntimeContext().getListState(
new ListStateDescriptor<ItemViewCount>("item-state", ItemViewCount.class)
);
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
itemState.add(itemViewCount);
context.timerService().registerEventTimeTimer(itemViewCount.windowEnd + 100L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
Iterable<ItemViewCount> itemViewCounts = itemState.get();
ArrayList<ItemViewCount> itemList = new ArrayList<>();
for (ItemViewCount iter : itemViewCounts) {
itemList.add(iter);
}
itemState.clear();
itemList.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount t0, ItemViewCount t1) {
return t1.count.intValue() - t0.count.intValue();
}
});
StringBuilder result = new StringBuilder();
result
.append("===========================================\n")
.append("time: ")
.append(new Timestamp(timestamp - 100L))
.append("\n");
for (int i = 0; i < this.threshold; i++) {
ItemViewCount currItem = itemList.get(i);
result
.append("No.")
.append(i + 1)
.append(" : ")
.append(currItem.itemId)
.append(" count = ")
.append(currItem.count)
.append("\n");
}
result
.append("===========================================\n\n\n");
Thread.sleep(1000L);
out.collect(result.toString());
}
}
}
将用户行为数据写入Kafka
public class UserBehaviorProduceToKafka {
public static void main(String[] args) throws Exception {
writeToKafka("hotitems");
}
public static void writeToKafka(String topic) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// pass the path to the file as a parameter
File file = new File("UserBehavior.csv");
Scanner sc = new Scanner(file);
while (sc.hasNextLine()) {
producer.send(new ProducerRecord<String, String>(topic, sc.nextLine()));
}
}
}
别忘记导入驱动
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.2.0</version>
</dependency>