Flink之窗口Join
窗口连接(Window Join)是指将两个共享相同键并且位于同一窗口中的流元素进行配对,这些窗口可以通过使用窗口分配器(WindowAssigner)来定义,来自双方的元素被传递给用户自定义的 JoinFunction 或 FlatJoinFunction,在这里用户可以发出满足连接条件的结果
stream.join(otherStream)
.where(<键选择器>) // 指定第一个流的键选择逻辑
.equalTo(<键选择器>) // 指定第二个流的键选择逻辑,应与第一个流的选择逻辑匹配
.window(<窗口分配器>) // 定义用于窗口化的具体规则
.apply(<JoinFunction>); // 应用用户自定义的连接函数处理匹配的元素
两个流中元素的成对组合创建行为类似于内连接(inner-join),意味着如果一个流中的元素在另一个流中没有对应元素可以连接,则该元素不会被发出
示例如下:
/**
* 这个类用于演示连接操作的使用
* 连接操作可以将两个数据流中的元素基于某个键进行关联
* 在这个示例中,我们将展示如何使用连接操作来关联两个元素
*/
public class ConnectionJoinDemo {
public static void main(String[] args) throws Exception {
// 初始化流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 创建随机数生成器
Random random = new Random();
// 创建订单流
SingleOutputStreamOperator<Order> orderStream = env.fromElements(
// 添加订单数据
new Order("001", "Prod00A",10,random.nextInt(10000)),
new Order("002", "Prod00B",20,random.nextInt(10000)),
new Order("003", "Prod00C",30,random.nextInt(10000)),
new Order("004", "Prod00B",40,random.nextInt(10000))
).assignTimestampsAndWatermarks(
// 设置Watermark策略,处理乱序数据
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
// 指定时间戳赋值器
new SerializableTimestampAssigner<Order>() {
@Override
public long extractTimestamp(Order element, long recordTimestamp) {
// 提取事件时间
return element.getCrtTime();
}
}
)
);
// 创建产品流
SingleOutputStreamOperator<Product> prodStream = env.fromElements(
// 添加产品数据
new Product("创维空调", "Prod00A",10,random.nextInt(10000)),
new Product("格力空调", "Prod00B",20,random.nextInt(10000)),
new Product("美的冰箱", "Prod00B",30,random.nextInt(10000))
).assignTimestampsAndWatermarks(
// 设置Watermark策略,处理乱序数据
WatermarkStrategy.<Product>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
// 指定时间戳赋值器
new SerializableTimestampAssigner<Product>() {
@Override
public long extractTimestamp(Product element, long recordTimestamp) {
// 提取事件时间
return element.getCrtTime();
}
}
)
);
// 订单流和产品流进行连接
orderStream.join(prodStream)
// 连接条件:订单流中的产品代码等于产品流中的产品代码
.where(order -> order.getProdCode())
.equalTo(payment -> payment.getProdCode())
// 设置时间窗口:每5秒一个窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 定义连接函数
.apply(new JoinFunction<Order, Product, String>() {
@Override
public String join(Order left, Product right) throws Exception {
// 定义连接结果的格式
return left.getProdCode() + "=>" + right.getProdCode();
}
})
// 打印结果到控制台
.print();
// 执行流处理任务
env.execute();
}
}