在RuntimeContext中定义键控状态
用户自定义函数可以使用keyed state来存储和访问key对应的状态。对于每一个key,Flink将会维护一个状态实例。一个操作符的状态实例将会被分发到操作符的所有并行任务中去。这表明函数的每一个并行任务只为所有key的某一部分key保存key对应的状态实例。所以keyed state和分布式key-value map数据结构非常类似。
keyed state仅可用于KeyedStream。Flink支持以下数据类型的状态变量:
- ValueState[T]保存单个的值,值的类型为T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
- ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
- ListState.add(value: T)
- ListState.addAll(values: java.util.List[T])
- ListState.get()返回Iterable[T]
- ListState.update(values: java.util.List[T])
- MapState[K, V]保存Key-Value对。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
- ReducingState[T]
- AggregatingState[I, O]
State.clear()是清空操作。
scala version
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
val alerts: DataStream[(String, Double, Double)] = keyedData
.flatMap(new TemperatureAlertFunction(1.7))
class TemperatureAlertFunction(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
}
override def flatMap(
reading: SensorReading,
out: Collector[(String, Double, Double)]
): Unit = {
val lastTemp = lastTempState.value()
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
out.collect((reading.id, reading.temperature, tempDiff))
}
this.lastTempState.update(reading.temperature)
}
}
上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。
不同key对应的keyed state是相互隔离的。
- 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
- 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。
当一个函数注册了StateDescriptor描述符,Flink会检查状态后端是否已经存在这个状态。这种情况通常出现在应用挂掉要从检查点或者保存点恢复的时候。在这两种情况下,Flink会将注册的状态连接到已经存在的状态。如果不存在状态,则初始化一个空的状态。
使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以实现以上需求。
scala version
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMapWithState[(String, Double, Double), Double] {
case (in: SensorReading, None) =>
// no previous temperature defined.
// Just update the last temperature
(List.empty, Some(in.temperature))
case (SensorReading r, lastTemp: Some[Double]) =>
// compare temperature difference with threshold
val tempDiff = (r.temperature - lastTemp.get).abs
if (tempDiff > 1.7) {
// threshold exceeded.
// Emit an alert and update the last temperature
(List((r.id, r.temperature, tempDiff)), Some(r.temperature))
} else {
// threshold not exceeded. Just update the last temperature
(List.empty, Some(r.temperature))
}
}