在RuntimeContext中定义键控状态

用户自定义函数可以使用keyed state来存储和访问key对应的状态。对于每一个key,Flink将会维护一个状态实例。一个操作符的状态实例将会被分发到操作符的所有并行任务中去。这表明函数的每一个并行任务只为所有key的某一部分key保存key对应的状态实例。所以keyed state和分布式key-value map数据结构非常类似。

keyed state仅可用于KeyedStream。Flink支持以下数据类型的状态变量:

  • ValueState[T]保存单个的值,值的类型为T。
    • get操作: ValueState.value()
    • set操作: ValueState.update(value: T)
  • ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
    • ListState.add(value: T)
    • ListState.addAll(values: java.util.List[T])
    • ListState.get()返回Iterable[T]
    • ListState.update(values: java.util.List[T])
  • MapState[K, V]保存Key-Value对。
    • MapState.get(key: K)
    • MapState.put(key: K, value: V)
    • MapState.contains(key: K)
    • MapState.remove(key: K)
  • ReducingState[T]
  • AggregatingState[I, O]

State.clear()是清空操作。

scala version

val sensorData: DataStream[SensorReading] = ... val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]( "lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)] ): Unit = { val lastTemp = lastTempState.value() val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { out.collect((reading.id, reading.temperature, tempDiff)) } this.lastTempState.update(reading.temperature) } }

上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。

不同key对应的keyed state是相互隔离的。

  • 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
  • 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。

当一个函数注册了StateDescriptor描述符,Flink会检查状态后端是否已经存在这个状态。这种情况通常出现在应用挂掉要从检查点或者保存点恢复的时候。在这两种情况下,Flink会将注册的状态连接到已经存在的状态。如果不存在状态,则初始化一个空的状态。

使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以实现以上需求。

scala version

val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMapWithState[(String, Double, Double), Double] { case (in: SensorReading, None) => // no previous temperature defined. // Just update the last temperature (List.empty, Some(in.temperature)) case (SensorReading r, lastTemp: Some[Double]) => // compare temperature difference with threshold val tempDiff = (r.temperature - lastTemp.get).abs if (tempDiff > 1.7) { // threshold exceeded. // Emit an alert and update the last temperature (List((r.id, r.temperature, tempDiff)), Some(r.temperature)) } else { // threshold not exceeded. Just update the last temperature (List.empty, Some(r.temperature)) } }