实时对帐:实现两条流的Join

完整代码如下:

scala version

object TwoStreamJoin {

  case class OrderEvent(orderId: String, eventType: String, eventTime: Long)

  case class PayEvent(orderId: String, eventType: String, eventTime: Long)

  val unmatchedOrders = new OutputTag[String]("unmatched-orders")
  val unmatchedPays   = new OutputTag[String]("unmatched-pays")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val orderStream = env
      .fromElements(
        OrderEvent("order_1", "pay", 1000L),
        OrderEvent("order_2", "pay", 2000L)
      )
      .assignAscendingTimestamps(r => r.eventTime)
      .keyBy(r => r.orderId)

    val payStream = env
      .fromElements(
        PayEvent("order_1", "weixin", 3000L),
        PayEvent("order_3", "weixin", 4000L)
      )
      .assignAscendingTimestamps(r => r.eventTime)
      .keyBy(r => r.orderId)

    val result = orderStream
      .connect(payStream)
      .process(new MatchFunction)

    result.print()

    result.getSideOutput(unmatchedOrders).print()

    result.getSideOutput(unmatchedPays).print()

    env.execute()
  }

  class MatchFunction extends CoProcessFunction[OrderEvent, PayEvent, String] {

    lazy val orderState = getRuntimeContext.getState(
      new ValueStateDescriptor[OrderEvent]("order", Types.of[OrderEvent])
    )

    lazy val payState = getRuntimeContext.getState(
      new ValueStateDescriptor[PayEvent]("pay", Types.of[PayEvent])
    )

    override def processElement1(order: OrderEvent, context: CoProcessFunction[OrderEvent, PayEvent, String]#Context, collector: Collector[String]): Unit = {
      val pay = payState.value()

      if (pay != null) {
        payState.clear()
        collector.collect("order id: " + order.orderId + " matched success!")
      } else {
        orderState.update(order)
        context.timerService().registerEventTimeTimer(order.eventTime + 5000L)
      }
    }

    override def processElement2(pay: PayEvent, context: CoProcessFunction[OrderEvent, PayEvent, String]#Context, collector: Collector[String]): Unit = {
      val order = orderState.value()

      if (order != null) {
        orderState.clear()
        collector.collect("order id: " + pay.orderId + " match success!")
      } else {
        payState.update(pay)
        context.timerService().registerEventTimeTimer(pay.eventTime + 5000L)
      }
    }

    override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
      if (orderState.value() != null) {
        ctx.output(unmatchedOrders, "order id: " + orderState.value().orderId + " fail match")
        orderState.clear()
      }
      if (payState.value() != null) {
        ctx.output(unmatchedPays, "order id: " + payState.value().orderId + " fail match")
        payState.clear()
      }
    }
  }
}

java version

public class TwoStreamsJoin {

    private static OutputTag<String> unmatchedOrders = new OutputTag<String>("order"){};
    private static OutputTag<String> unmatchedPays   = new OutputTag<String>("pay"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        KeyedStream<OrderEvent, String> orderStream = env
                .fromElements(
                        new OrderEvent("order_1", "pay", 1000L),
                        new OrderEvent("order_2", "pay", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
                                    @Override
                                    public long extractTimestamp(OrderEvent orderEvent, long l) {
                                        return orderEvent.eventTime;
                                    }
                                })
                )
                .keyBy(r -> r.orderId);

        KeyedStream<PayEvent, String> payStream = env
                .fromElements(
                        new PayEvent("order_1", "weixin", 3000L),
                        new PayEvent("order_3", "weixin", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<PayEvent>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<PayEvent>() {
                                    @Override
                                    public long extractTimestamp(PayEvent payEvent, long l) {
                                        return payEvent.eventTime;
                                    }
                                })
                )
                .keyBy(r -> r.orderId);

        SingleOutputStreamOperator<String> result = orderStream
                .connect(payStream)
                .process(new CoProcessFunction<OrderEvent, PayEvent, String>() {
                    private ValueState<OrderEvent> orderState;
                    private ValueState<PayEvent> payState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        orderState = getRuntimeContext().getState(
                                new ValueStateDescriptor<OrderEvent>("order", OrderEvent.class)
                        );
                        payState = getRuntimeContext().getState(
                                new ValueStateDescriptor<PayEvent>("pay", PayEvent.class)
                        );
                    }

                    @Override
                    public void processElement1(OrderEvent orderEvent, Context context, Collector<String> collector) throws Exception {
                        PayEvent pay = payState.value();
                        if (pay != null) {
                            payState.clear();
                            collector.collect("order id " + orderEvent.orderId + " matched success");
                        } else {
                            orderState.update(orderEvent);
                            context.timerService().registerEventTimeTimer(orderEvent.eventTime + 5000L);
                        }
                    }

                    @Override
                    public void processElement2(PayEvent payEvent, Context context, Collector<String> collector) throws Exception {
                        OrderEvent order = orderState.value();
                        if (order != null) {
                            orderState.clear();
                            collector.collect("order id" + payEvent.orderId + " matched success");
                        } else {
                            payState.update(payEvent);
                            context.timerService().registerEventTimeTimer(payEvent.eventTime + 5000L);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        if (orderState.value() != null) {
                            ctx.output(unmatchedOrders, "order id: " + orderState.value().orderId + " not match");
                            orderState.clear();
                        }
                        if (payState.value() != null) {
                            ctx.output(unmatchedPays, "order id: " + payState.value().orderId + " not match");
                        }
                    }
                });

        result.print();

        result.getSideOutput(unmatchedOrders).print();

        result.getSideOutput(unmatchedPays).print();

        env.execute();
    }
}