以Kafka消息队列的数据为数据来源
scala version
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
// source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
.addSource(
new FlinkKafkaConsumer011[String](
"hotitems",
new SimpleStringSchema(),
properties
)
)
java version
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty("auto.offset.reset", "latest");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> stream = env
// source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
.addSource(
new FlinkKafkaConsumer011<String>(
"hotitems",
new SimpleStringSchema(),
properties
)
);