使用Process Function实现订单超时需求
scala version
object OrderTimeoutWithoutCep {
case class OrderEvent(orderId: String,
eventType: String,
eventTime: String)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.fromElements(
OrderEvent("1", "create", "2"),
OrderEvent("2", "create", "3"),
OrderEvent("2", "pay", "4")
)
.assignAscendingTimestamps(_.eventTime.toLong * 1000L)
.keyBy(_.orderId)
.process(new OrderMatchFunc)
stream.print()
env.execute()
}
class OrderMatchFunc extends KeyedProcessFunction[String, OrderEvent, String] {
lazy val orderState = getRuntimeContext.getState(
new ValueStateDescriptor[OrderEvent]("saved order", Types.of[OrderEvent])
)
override def processElement(value: OrderEvent,
ctx: KeyedProcessFunction[String, OrderEvent, String]#Context,
out: Collector[String]): Unit = {
if (value.eventType.equals("create")) {
if (orderState.value() == null) { // 为什么要判空?因为可能出现`pay`先到的情况
// 如果orderState为空,保存`create`事件
orderState.update(value)
}
} else {
// 保存`pay`事件
orderState.update(value)
}
ctx.timerService().registerEventTimeTimer(value.eventTime.toLong * 1000 + 5000L)
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, OrderEvent, String]#OnTimerContext,
out: Collector[String]): Unit = {
val savedOrder = orderState.value()
if (savedOrder != null && savedOrder.eventType.equals("create")) {
out.collect("超时订单的ID为:" + savedOrder.orderId)
}
orderState.clear()
}
}
}
java version
OrderEvent的POJO类实现
public class OrderEvent {
public String orderId;
public String eventType;
public Long eventTime;
public OrderEvent() {
}
public OrderEvent(String orderId, String eventType, Long eventTime) {
this.orderId = orderId;
this.eventType = eventType;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "OrderEvent{" +
"orderId='" + orderId + '\'' +
", eventType='" + eventType + '\'' +
", eventTime=" + eventTime +
'}';
}
}
public class OrderTimeoutDetectWithoutCEP {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
SingleOutputStreamOperator<OrderEvent> stream = env
.fromElements(
new OrderEvent("order_1", "create", 1000L),
new OrderEvent("order_2", "create", 2000L),
new OrderEvent("order_1", "pay", 3000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent element, long recordTimestamp) {
return element.eventTime;
}
})
);
stream
.keyBy(r -> r.orderId)
.process(new MyKeyed())
.print();
env.execute();
}
public static class MyKeyed extends KeyedProcessFunction<String, OrderEvent, String> {
private ValueState<OrderEvent> orderState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
orderState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order", OrderEvent.class));
}
@Override
public void processElement(OrderEvent orderEvent, Context context, Collector<String> collector) throws Exception {
if (orderEvent.eventType.equals("create")) {
// 为什么判空?因为pay事件可能先到
if (orderState.value() == null) {
orderState.update(orderEvent);
context.timerService().registerEventTimeTimer(orderEvent.eventTime + 5000L);
}
} else {
orderState.update(orderEvent);
collector.collect("订单ID为 " + orderEvent.orderId + " 支付成功!");
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
if (orderState.value() != null && orderState.value().eventType.equals("create")) {
out.collect("没有支付的订单ID是 " + orderState.value().orderId);
}
orderState.clear();
}
}
}