flink之输出算子(Sink)

2024-08-12 13:53

输出算子(Sink)

在Flink的数据处理流程中,Sink算子扮演着数据输出的角色,负责将处理后的数据写入到外部系统中,如文件系统、数据库、消息队列等。Flink的Sink设计高度可扩展,支持多种数据输出方式,满足不同的应用场景需求。

Sink的基本概念

  • 数据输出:Sink是Flink作业数据流的终点,用于将数据写出到外部存储或系统中。

  • 灵活性:Flink允许用户自定义Sink实现,以适应特定的输出需求。

  • 容错保证:Sink在设计上需要考虑容错性,确保数据的准确处理和输出,即使在故障发生时也不例外。

  • 连接器:为了简化与外部系统的集成,Flink提供了许多预定义的Sink连接器(如KafkaSink、FileSink、JDBCsink等),同时也鼓励用户开发自定义Sink。

    预定义Sink

  1. FileSink:用于将数据写入到文件系统中,支持滚动策略(如基于时间、大小)来创建新文件。这对于日志记录、数据备份等场景非常有用。

  2. KafkaSink:将数据流输出到Apache Kafka主题中,适用于构建实时数据管道,实现数据流的高效传输和进一步处理。

  3. JDBC sink:允许将数据直接写入支持JDBC的数据库中,如MySQL、PostgreSQL等,适合于实时数据分析后存储结果。

  4. HDFS Sink:用于将数据写入Hadoop分布式文件系统(HDFS),常用于大数据分析的预处理阶段。

  5. Elasticsearch Sink:将数据索引到Elasticsearch中,便于进行全文搜索、分析和可视化展示。

image.png


import cn.guangjun.flink.pojo.Event;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.Random;
import java.util.concurrent.TimeUnit;
public class FileSinkDemo {
    public static void main(String[] args) throws Exception{
        // 初始化随机数生成器,用于后续生成随机的用户ID和分数
        Random random = new Random();
        // 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为6
        env.setParallelism(6);

        // 从元素生成DataStream,模拟事件流数据
        DataStream<Event> stream = env.fromElements(
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高三", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高一", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高二", random.nextInt(100)),
                new Event("user:" + random.nextInt(50), "高三", random.nextInt(100)));

        // 构建文件输出的StreamingFileSink
        StreamingFileSink<String> fileSink = StreamingFileSink
                // 指定输出文件的路径
                .<String>forRowFormat(new Path("d://student"),
                        // 指定使用SimpleStringEncoder编码器和字符集
                        new SimpleStringEncoder<>("UTF-8"))
                // 配置滚动策略,决定何时创建新文件
                .withRollingPolicy(
                        // 使用默认的滚动策略构建器进行配置
                        DefaultRollingPolicy.builder()
                                // 设置滚动间隔时间为15分钟
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                // 设置不活跃间隔时间
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                // 设置单个文件的最大大小
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                // 完成StreamingFileSink的配置并创建实例
                .build();

        // 将Event转换成String并写入文件
        stream.map(Event::toString).addSink(fileSink).setParallelism(1);
        // 执行流处理任务
        env.execute();
    }
}

image.png

相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号