JDBC自定义sink
建表语句
create database sensor;
create table temps(id varchar(20), temp float);
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
示例代码:
scala version
object SinkToMySQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream.addSink(new MyJDBCSink)
env.execute()
}
class MyJDBCSink extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/sensor",
"zuoyuan",
"zuoyuan"
)
insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?")
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
}
java version
public class SinkToMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorReading> stream = env.addSource(new SensorSource());
stream.addSink(new MyJDBCSink());
env.execute();
}
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
private Connection conn;
private PreparedStatement insertStmt;
private PreparedStatement updateStmt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/sensor",
"zuoyuan",
"zuoyuan"
);
insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)");
updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?");
}
@Override
public void invoke(SensorReading value, Context context) throws Exception {
updateStmt.setDouble(1, value.temperature);
updateStmt.setString(2, value.id);
updateStmt.execute();
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setString(1, value.id);
insertStmt.setDouble(2, value.temperature);
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
super.close();
insertStmt.close();
updateStmt.close();
conn.close();
}
}
}