flink之自定义slink

2024-08-14 09:22

flink自定义slink(JdbcSlink)

示例

package cn.guangjun.flink.sink;

import cn.guangjun.flink.pojo.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;

@Slf4j
/**
 * JdbcSlink是一个富封装的Sink函数,用于将Flink流中的Event数据批量插入到MySQL数据库中。
 */
public class JdbcSlink extends RichSinkFunction<Event> {

    // 数据库连接
    private Connection conn;
    // 预编译的SQL语句
    private PreparedStatement preparedStatement;
    // 插入操作的SQL语句
    private final String INSERT_SQL = "INSERT INTO event (grade, name, score) VALUES (?, ?, ?)";
    // 用于批量处理的事件列表
    private List<Event> batchEvents = new ArrayList<>();

    /**
     * invoke方法用于处理流中的每个Event元素。
     * 当batchEvents列表中的元素数量达到100时,调用executeBatchInsert进行批量插入。
     *
     * @param event 要处理的Event对象
     * @param context 上下文对象,可用于访问定时器等
     * @throws Exception 如果处理过程中发生错误,可以抛出异常
     */
    @Override
    public void invoke(Event event, Context context) throws Exception {
        System.out.println(">>>" + event);
        batchEvents.add(event);
        if (batchEvents.size() >= 100) { // 假设每100条记录执行一次批量插入
            executeBatchInsert();
        }
    }

    /**
     * 执行批量插入操作。
     *
     * @throws SQLException 如果数据库访问出现问题,将抛出SQLException
     */
    private void executeBatchInsert() throws SQLException {
        try (PreparedStatement ps = conn.prepareStatement(INSERT_SQL)) {
            for (Event e : batchEvents) {
                ps.setString(1, e.getGrade());
                ps.setString(2, e.getName());
                ps.setInt(3, e.getScore());
                ps.addBatch();
            }
            ps.executeBatch();
        } finally {
            batchEvents.clear();
        }
    }

    /**
     * open方法用于初始化数据库连接。
     *
     * @param parameters Flink配置对象,可用于配置连接参数
     * @throws Exception 如果数据库连接失败,可以抛出异常
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver"); // 显式加载驱动
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
            preparedStatement = conn.prepareStatement(INSERT_SQL);
        } catch (Exception ex) {
            log.error("数据库连接异常", ex);
            throw ex; // 抛出异常以便外部处理
        }
    }

    /**
     * close方法用于关闭数据库连接和PreparedStatement。
     *
     * @throws Exception 如果关闭过程中出现错误,可以抛出异常
     */
    @Override
    public void close() throws Exception {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch (SQLException ex) {
            log.error("数据库关闭异常", ex);
            throw ex; // 抛出异常以便外部处理
        }
    }

}

package cn.guangjun.flink.sink;

import cn.guangjun.flink.pojo.Event;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Random;

/**
 * JdbcSlinkDemo 类用于演示如何在Flink环境中使用JDBC连接器将数据写入数据库
 */
public class JdbcSlinkDemo {
    // 静态随机数生成器,用于在示例事件中生成随机数
    private static Random random = new Random();

    public static void main(String[] args) throws Exception{
        // 获取Flink的流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1,以便在本示例中简化调试过程
        env.setParallelism(1);

        // 从Elements生成一个流,并映射为Event对象
        SingleOutputStreamOperator<Event> map = env.fromElements(
                // 生成多个模拟事件,每个事件的用户ID、年级和随机数各不相同
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高三", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高三", random.nextInt(100))
        ).map(event -> event);
        // 将数据写入到使用JDBC连接器的外部数据库
        map.addSink(new JdbcSlink());
        // 执行流处理环境
        env.execute();
    }
}


相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号