实现自定义源函数
DataStream API提供了两个接口来实现source连接器:
- SourceFunction和RichSourceFunction可以用来定义非并行的source连接器,source跑在单任务上。
- ParallelSourceFunction和RichParallelSourceFunction可以用来定义跑在并行实例上的source连接器。
除了并行于非并行的区别,这两种接口完全一样。就像process function的rich版本一样,RichSourceFunction和RichParallelSourceFunction的子类可以override open()和close()方法,也可以访问RuntimeContext,RuntimeContext提供了并行任务实例的数量,当前任务实例的索引,以及一些其他信息。
SourceFunction和ParallelSourceFunction定义了两种方法:
- void run(SourceContext
ctx) - cancel()
run()方法用来读取或者接收数据然后将数据摄入到Flink应用中。根据接收数据的系统,数据可能是推送的也可能是拉取的。Flink仅仅在特定的线程调用run()方法一次,通常情况下会是一个无限循环来读取或者接收数据并发送数据。任务可以在某个时间点被显式的取消,或者由于流是有限流,当数据被消费完毕时,任务也会停止。
当应用被取消或者关闭时,cancel()方法会被Flink调用。为了优雅的关闭Flink应用,run()方法需要在cancel()被调用以后,立即终止执行。下面的例子显示了一个简单的源函数的例子:从0数到Long.MaxValue。
class CountSource extends SourceFunction[Long] {
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Long]) = {
var cnt: Long = -1
while (isRunning && cnt < Long.MaxValue) {
cnt += 1
ctx.collect(cnt)
}
}
override def cancel() = isRunning = false
}