Redis

<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>

定义一个redis的mapper类,用于定义保存到redis时调用的命令:

scala version

object SinkToRedisExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(new SensorSource) val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build() stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink)) env.execute() } class MyRedisSink extends RedisMapper[SensorReading] { override def getKeyFromData(t: SensorReading): String = t.id override def getValueFromData(t: SensorReading): String = t.temperature.toString override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor") } }

java version

public class WriteToRedisExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<SensorReading> stream = env.addSource(new SensorSource()); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build(); stream.addSink(new RedisSink<SensorReading>(conf, new MyRedisSink())); env.execute(); } public static class MyRedisSink implements RedisMapper<SensorReading> { @Override public String getKeyFromData(SensorReading sensorReading) { return sensorReading.id; } @Override public String getValueFromData(SensorReading sensorReading) { return sensorReading.temperature + ""; } @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor"); } } }