flink自定义slink(JdbcSlink)
示例
package cn.guangjun.flink.sink;
import cn.guangjun.flink.pojo.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;
@Slf4j
/**
* JdbcSlink是一个富封装的Sink函数,用于将Flink流中的Event数据批量插入到MySQL数据库中。
*/
public class JdbcSlink extends RichSinkFunction<Event> {
// 数据库连接
private Connection conn;
// 预编译的SQL语句
private PreparedStatement preparedStatement;
// 插入操作的SQL语句
private final String INSERT_SQL = "INSERT INTO event (grade, name, score) VALUES (?, ?, ?)";
// 用于批量处理的事件列表
private List<Event> batchEvents = new ArrayList<>();
/**
* invoke方法用于处理流中的每个Event元素。
* 当batchEvents列表中的元素数量达到100时,调用executeBatchInsert进行批量插入。
*
* @param event 要处理的Event对象
* @param context 上下文对象,可用于访问定时器等
* @throws Exception 如果处理过程中发生错误,可以抛出异常
*/
@Override
public void invoke(Event event, Context context) throws Exception {
System.out.println(">>>" + event);
batchEvents.add(event);
if (batchEvents.size() >= 100) { // 假设每100条记录执行一次批量插入
executeBatchInsert();
}
}
/**
* 执行批量插入操作。
*
* @throws SQLException 如果数据库访问出现问题,将抛出SQLException
*/
private void executeBatchInsert() throws SQLException {
try (PreparedStatement ps = conn.prepareStatement(INSERT_SQL)) {
for (Event e : batchEvents) {
ps.setString(1, e.getGrade());
ps.setString(2, e.getName());
ps.setInt(3, e.getScore());
ps.addBatch();
}
ps.executeBatch();
} finally {
batchEvents.clear();
}
}
/**
* open方法用于初始化数据库连接。
*
* @param parameters Flink配置对象,可用于配置连接参数
* @throws Exception 如果数据库连接失败,可以抛出异常
*/
@Override
public void open(Configuration parameters) throws Exception {
try {
Class.forName("com.mysql.cj.jdbc.Driver"); // 显式加载驱动
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
preparedStatement = conn.prepareStatement(INSERT_SQL);
} catch (Exception ex) {
log.error("数据库连接异常", ex);
throw ex; // 抛出异常以便外部处理
}
}
/**
* close方法用于关闭数据库连接和PreparedStatement。
*
* @throws Exception 如果关闭过程中出现错误,可以抛出异常
*/
@Override
public void close() throws Exception {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException ex) {
log.error("数据库关闭异常", ex);
throw ex; // 抛出异常以便外部处理
}
}
}
package cn.guangjun.flink.sink;
import cn.guangjun.flink.pojo.Event;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Random;
/**
* JdbcSlinkDemo 类用于演示如何在Flink环境中使用JDBC连接器将数据写入数据库
*/
public class JdbcSlinkDemo {
// 静态随机数生成器,用于在示例事件中生成随机数
private static Random random = new Random();
public static void main(String[] args) throws Exception{
// 获取Flink的流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,以便在本示例中简化调试过程
env.setParallelism(1);
// 从Elements生成一个流,并映射为Event对象
SingleOutputStreamOperator<Event> map = env.fromElements(
// 生成多个模拟事件,每个事件的用户ID、年级和随机数各不相同
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高三", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
new Event("user:" + random.nextInt(50), "高三", random.nextInt(100))
).map(event -> event);
// 将数据写入到使用JDBC连接器的外部数据库
map.addSink(new JdbcSlink());
// 执行流处理环境
env.execute();
}
}