Flink之分组CoGroup

2024-08-21 17:17

Flink 分组CoGroup

CoGroup是Apache Flink中一种高级的数据流关联操作,它允许在两个或多个数据流之间根据指定的键进行分组和合并处理。该操作特别适合处理多对一、一对多或多对多的复杂关联场景,通过在相同键下聚合不同流的数据,提供了高度灵活的数据整合能力.

CoGroup

功能: CoGroup允许在两个或多个数据流之间进行分组关联。它要求每个输入流都定义一个键选择器函数(KeySelector),并在相同键值下将来自不同流的数据分组合并到一个组内。这意味着,CoGroup可以处理两个流中的数据存在一对多、多对一或多对多关系的情况。

窗口: CoGroup通常与窗口操作结合使用(如在上述代码中配合滑动或滚动窗口),这使得它非常适合处理基于时间窗口的关联需求。

输出: CoGroupFunction接收两个或多个迭代器作为输入,分别代表每个输入流中当前窗口和键下所有的元素。开发者需要手动处理这些迭代器,可能涉及到循环遍历、匹配逻辑等,灵活性高但实现复杂度相对较大。

Join

功能: Join操作更倾向于处理一对一的关联场景,它基于相同的键将两个数据流的数据匹配在一起。简单来说,如果一个键在两个流中都存在,则这两个流中的元素会被匹配并作为一个输出结果。

即时性: 相较于CoGroupJoin不直接涉及窗口操作,虽然它也可以与时间窗口结合使用(例如通过intervalJoin),但基础的Join操作是基于数据流中直接的、即时的匹配。. 输出: 在实现上,JoinFunction处理的是两个具体元素,而不是迭代器,当两个流中找到匹配的键时,会调用JoinFunction处理这对匹配的元素。这种方式更加直观,适用于一对一匹配的简单场景。

示例

/**
 * FlinkGroupDemo 类用于演示Flink中的分组操作
 * 该类主要功能是通过处理订单流和产品流,演示如何使用Flink进行流数据的分组处理
 */
public class FlinkGroupDemo {
    /**
     * 程序的主入口点
     * @param args 命令行参数
     * @throws Exception 如果在处理过程中发生错误,则抛出异常
     */
    public static void main(String[] args) throws Exception {
        // 初始化流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 创建随机数生成器
        Random random = new Random();
        // 创建订单流
        SingleOutputStreamOperator<Order> orderStream = env.fromElements(
                // 添加订单数据
                new Order("001", "Prod00A",10,random.nextInt(10000)),
                new Order("002", "Prod00B",20,random.nextInt(10000)),
                new Order("003", "Prod00C",30,random.nextInt(10000)),
                new Order("004", "Prod00B",40,random.nextInt(10000))
        ).assignTimestampsAndWatermarks(
                // 设置Watermark策略,处理乱序数据
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(
                                // 指定时间戳赋值器
                                new SerializableTimestampAssigner<Order>() {
                                    @Override
                                    public long extractTimestamp(Order element, long recordTimestamp) {
                                        // 提取事件时间
                                        return element.getCrtTime();
                                    }
                                }
                        )
        );
        // 创建产品流
        SingleOutputStreamOperator<Product> prodStream = env.fromElements(
                // 添加产品数据
                new Product("创维空调", "Prod00A",10,random.nextInt(10000)),
                new Product("格力空调", "Prod00B",20,random.nextInt(10000)),
                new Product("美的冰箱", "Prod00B",30,random.nextInt(10000))
        ).assignTimestampsAndWatermarks(
                // 设置Watermark策略,处理乱序数据
                WatermarkStrategy.<Product>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(
                                // 指定时间戳赋值器
                                new SerializableTimestampAssigner<Product>() {
                                    @Override
                                    public long extractTimestamp(Product element, long recordTimestamp) {
                                        // 提取事件时间
                                        return element.getCrtTime();
                                    }
                                }
                        )
        );

        // 对订单流和产品流进行连接,并基于产品代码进行分组,每5秒一个窗口
        orderStream.coGroup(prodStream)
                .where(order -> order.getProdCode())
                .equalTo(product -> Product.getProdCode())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Order, Product, String>() {
                    @Override
                    public void coGroup(Iterable<Order> first, Iterable<Product> second, Collector<String> out) throws Exception {
                        // 输出分组结果
                        out.collect("Orders: " + first + ", Product: " + second);
                    }
                }).print(); // 打印输出结果

        env.execute();
    }
}
相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号