Flink之窗口Join

2024-08-21 16:45

Flink之窗口Join

窗口连接(Window Join)是指将两个共享相同键并且位于同一窗口中的流元素进行配对,这些窗口可以通过使用窗口分配器(WindowAssigner)来定义,来自双方的元素被传递给用户自定义的 JoinFunction 或 FlatJoinFunction,在这里用户可以发出满足连接条件的结果

stream.join(otherStream)
    .where(<键选择器>) // 指定第一个流的键选择逻辑
    .equalTo(<键选择器>) // 指定第二个流的键选择逻辑,应与第一个流的选择逻辑匹配
    .window(<窗口分配器>) // 定义用于窗口化的具体规则
    .apply(<JoinFunction>); // 应用用户自定义的连接函数处理匹配的元素

两个流中元素的成对组合创建行为类似于内连接(inner-join),意味着如果一个流中的元素在另一个流中没有对应元素可以连接,则该元素不会被发出

示例如下:

/**
 * 这个类用于演示连接操作的使用
 * 连接操作可以将两个数据流中的元素基于某个键进行关联
 * 在这个示例中,我们将展示如何使用连接操作来关联两个元素
 */
public class ConnectionJoinDemo {
    public static void main(String[] args) throws Exception {
        // 初始化流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 创建随机数生成器
        Random random = new Random();

        // 创建订单流
        SingleOutputStreamOperator<Order> orderStream = env.fromElements(
                // 添加订单数据
                new Order("001", "Prod00A",10,random.nextInt(10000)),
                new Order("002", "Prod00B",20,random.nextInt(10000)),
                new Order("003", "Prod00C",30,random.nextInt(10000)),
                new Order("004", "Prod00B",40,random.nextInt(10000))
        ).assignTimestampsAndWatermarks(
                // 设置Watermark策略,处理乱序数据
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(
                                // 指定时间戳赋值器
                                new SerializableTimestampAssigner<Order>() {
                                    @Override
                                    public long extractTimestamp(Order element, long recordTimestamp) {
                                        // 提取事件时间
                                        return element.getCrtTime();
                                    }
                                }
                        )
        );

        // 创建产品流
        SingleOutputStreamOperator<Product> prodStream = env.fromElements(
                // 添加产品数据
                new Product("创维空调", "Prod00A",10,random.nextInt(10000)),
                new Product("格力空调", "Prod00B",20,random.nextInt(10000)),
                new Product("美的冰箱", "Prod00B",30,random.nextInt(10000))
        ).assignTimestampsAndWatermarks(
                // 设置Watermark策略,处理乱序数据
                WatermarkStrategy.<Product>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(
                                // 指定时间戳赋值器
                                new SerializableTimestampAssigner<Product>() {
                                    @Override
                                    public long extractTimestamp(Product element, long recordTimestamp) {
                                        // 提取事件时间
                                        return element.getCrtTime();
                                    }
                                }
                        )
        );

        // 订单流和产品流进行连接
        orderStream.join(prodStream)
                // 连接条件:订单流中的产品代码等于产品流中的产品代码
                .where(order -> order.getProdCode())
                .equalTo(payment -> payment.getProdCode())
                // 设置时间窗口:每5秒一个窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 定义连接函数
                .apply(new JoinFunction<Order, Product, String>() {
                    @Override
                    public String join(Order left, Product right) throws Exception {
                        // 定义连接结果的格式
                        return left.getProdCode() + "=>" + right.getProdCode();
                    }
                })
                // 打印结果到控制台
                .print();
        // 执行流处理任务
        env.execute();
    }
}
相关新闻
热点
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号