转换算子的使用

一旦我们有一条DataStream,我们就可以在这条数据流上面使用转换算子了。转换算子有很多种。一些转换算子可以产生一条新的DataStream,当然这个DataStream的类型可能是新类型。还有一些转换算子不会改变原有DataStream的数据,但会将数据流分区或者分组。业务逻辑就是由转换算子串起来组合而成的。

在我们的例子中,我们首先使用map()转换算子将传感器的温度值转换成了摄氏温度单位。然后,我们使用keyBy()转换算子将传感器读数流按照传感器ID进行分区。接下来,我们定义了一个timeWindow()转换算子,这个算子将每个传感器ID所对应的分区的传感器读数分配到了5秒钟的滚动窗口中。

scala version

val avgTemp = sensorData
  .map(r => {
    val celsius = (r.temperature - 32) * (5.0 / 9.0)
    SensorReading(r.id, r.timestamp, celsius)
  })
  .keyBy(_.id)
  .timeWindow(Time.seconds(5))
  .apply(new TemperatureAverager)

java version

DataStream<T> avgTemp = sensorData
  .map(r -> {
    Double celsius = (r.temperature -32) * (5.0 / 9.0);
    return SensorReading(r.id, r.timestamp, celsius);
  })
  .keyBy(r -> r.id)
  .timeWindow(Time.seconds(5))
  .apply(new TemperatureAverager());

窗口转换算子将在“窗口操作符”一章中讲解。最后,我们使用了一个UDF函数来计算每个窗口的温度的平均值。我们稍后将会讨论UDF函数的实现。