重定向迟到元素
迟到的元素也可以使用侧输出(side output)特性被重定向到另外的一条流中去。迟到元素所组成的侧输出流可以继续处理或者sink到持久化设施中去。
例子
scala version
val readings = env
.socketTextStream("localhost", 9999, '\n')
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong * 1000)
})
.assignAscendingTimestamps(_._2)
val countPer10Secs = readings
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.sideOutputLateData(
new OutputTag[(String, Long)]("late-readings")
)
.process(new CountFunction())
val lateStream = countPer10Secs
.getSideOutput(
new OutputTag[(String, Long)]("late-readings")
)
lateStream.print()
实现CountFunction
:
class CountFunction extends ProcessWindowFunction[(String, Long),
String, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Long)],
out: Collector[String]): Unit = {
out.collect("窗口共有" + elements.size + "条数据")
}
}
java version
public class RedirectLateEvent {
private static OutputTag<Tuple2<String, Long>> output = new OutputTag<Tuple2<String, Long>>("late-readings"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Long>> stream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.
// like scala: assignAscendingTimestamps(_._2)
<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> value, long l) {
return value.f1;
}
})
);
SingleOutputStreamOperator<String> lateReadings = stream
.keyBy(r -> r.f0)
.timeWindow(Time.seconds(5))
.sideOutputLateData(output) // use after keyBy and timeWindow
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long exactSizeIfKnown = iterable.spliterator().getExactSizeIfKnown();
collector.collect(exactSizeIfKnown + " of elements");
}
});
lateReadings.print();
lateReadings.getSideOutput(output).print();
env.execute();
}
}
下面这个例子展示了ProcessFunction如何过滤掉迟到的元素然后将迟到的元素发送到侧输出流中去。
scala version
val readings: DataStream[SensorReading] = ...
val filteredReadings: DataStream[SensorReading] = readings
.process(new LateReadingsFilter)
// retrieve late readings
val lateReadings: DataStream[SensorReading] = filteredReadings
.getSideOutput(new OutputTag[SensorReading]("late-readings"))
/** A ProcessFunction that filters out late sensor readings and
* re-directs them to a side output */
class LateReadingsFilter
extends ProcessFunction[SensorReading, SensorReading] {
val lateReadingsOut = new OutputTag[SensorReading]("late-readings")
override def processElement(
SensorReading r,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// compare record timestamp with current watermark
if (r.timestamp < ctx.timerService().currentWatermark()) {
// this is a late reading => redirect it to the side output
ctx.output(lateReadingsOut, r)
} else {
out.collect(r)
}
}
}
java version
public class RedirectLateEvent {
private static OutputTag<String> output = new OutputTag<String>("late-readings"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple2<String, Long>> stream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.
<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> value, long l) {
return value.f1;
}
})
)
.process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> stringLongTuple2, Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
if (stringLongTuple2.f1 < context.timerService().currentWatermark()) {
context.output(output, "late event is comming!");
} else {
collector.collect(stringLongTuple2);
}
}
});
stream.print();
stream.getSideOutput(output).print();
env.execute();
}
}