将事件发送到侧输出
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
例子
scala version
object SideOutputExample {
val output = new OutputTag[String]("side-output")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
val warnings = stream
.process(new FreezingAlarm)
warnings.print() // 打印主流
warnings.getSideOutput(output).print() // 打印侧输出流
env.execute()
}
class FreezingAlarm extends ProcessFunction[SensorReading, SensorReading] {
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature < 32.0) {
ctx.output(output, "传感器ID为:" + value.id + "的传感器温度小于32度!")
}
out.collect(value)
}
}
}
java version
public class SideOutputExample {
private static OutputTag<String> output = new OutputTag<String>("side-output"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorReading> stream = env.addSource(new SensorSource());
SingleOutputStreamOperator<SensorReading> warnings = stream
.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
if (value.temperature < 32) {
ctx.output(output, "温度小于32度!");
}
out.collect(value);
}
});
warnings.print();
warnings.getSideOutput(output).print();
env.execute();
}
}