Flink合流

2024-08-21 14:57

Flink合流

Apache Flink中的UNION操作为大数据处理提供了强大的灵活性,允许开发者轻松地将多个数据流合并为一个连续的、未经过滤的(即保留所有重复项)数据流

步骤:

首先定义好每个单独的数据流(利用Flink丰富的数据源API),然后通过简单的.union()方法调用即可完成合并。这个过程高度灵活,支持合并相同或不同类型的数据流(当然,类型需兼容),并且能够自然地融入到Flink的流式处理管道中,与其他转换(如map, filter, reduce等)和窗口操作结合使用

注意:

UNION操作本身不改变元素的顺序(保证每个源流内部的顺序),但在分布式环境中,考虑到数据的分区和并行处理,最终输出的全局顺序可能并非严格按照源流顺序

示例


import cn.guangjun.flink.pojo.User;
import cn.guangjun.flink.stream.CustomUserSecSource;
import cn.guangjun.flink.stream.CustomUserSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
/**
 * UnionStreamDemo 类演示了如何使用 Stream 的 union 操作
 * 该类提供方法来合并两个 Stream,并处理合并后的数据
 */
public class UnionStreamDemo {
    /**
     * 主方法,展示如何合并两个 Stream 并处理数据
     * @throws Exception 如果流式计算作业执行过程中出现错误,则抛出异常
     */
    public static void main(String[] args) throws Exception {
        // 获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);

        // 创建第一个流,每1秒生成一个User事件
        SingleOutputStreamOperator<User> stream1 = env.addSource(new CustomUserSource())
                // 为流分配时间戳和水印,确保事件时间顺序
                .assignTimestampsAndWatermarks(WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<User>() {
                            @Override
                            public long extractTimestamp(User element, long recordTimestamp) {
                                // 使用Event的时间戳
                                return element.getTimeStamp();
                            }
                        })
                );

        // 创建第二个流,每3秒生成一个User
        SingleOutputStreamOperator<User> stream2 = env.addSource(new CustomUserSecSource())
                // 为流分配时间戳和水印,确保事件时间顺序
                .assignTimestampsAndWatermarks(WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<User>() {
                            @Override
                            public long extractTimestamp(User element, long recordTimestamp) {
                                // 使用Event的时间戳
                                return element.getTimeStamp();
                            }
                        })
                );

        // 合并两条流
        DataStream<User> union = stream1.union(stream2);
        // 处理合并后的流,将User事件转换为String并打印
        union.map(new MapFunction<User, String>() {
            @Override
            public String map(User user) throws Exception {
                // 提取User事件的等级
                return user.getGrade();
            }
        }).print("union");

        // 执行流式计算作业
        env.execute();
    }
}

该示例要求参与合并的两个或多个数据流的数据类型必须严格一致。当数据类型相匹配时,合并过程无碍,数据可直接聚合为单一数据流。

相关新闻
热点
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号