Flink 分组CoGroup
CoGroup
是Apache Flink中一种高级的数据流关联操作,它允许在两个或多个数据流之间根据指定的键进行分组和合并处理。该操作特别适合处理多对一、一对多或多对多的复杂关联场景,通过在相同键下聚合不同流的数据,提供了高度灵活的数据整合能力.
CoGroup
功能: CoGroup
允许在两个或多个数据流之间进行分组关联。它要求每个输入流都定义一个键选择器函数(KeySelector),并在相同键值下将来自不同流的数据分组合并到一个组内。这意味着,CoGroup
可以处理两个流中的数据存在一对多、多对一或多对多关系的情况。
窗口: CoGroup
通常与窗口操作结合使用(如在上述代码中配合滑动或滚动窗口),这使得它非常适合处理基于时间窗口的关联需求。
输出: CoGroupFunction
接收两个或多个迭代器作为输入,分别代表每个输入流中当前窗口和键下所有的元素。开发者需要手动处理这些迭代器,可能涉及到循环遍历、匹配逻辑等,灵活性高但实现复杂度相对较大。
Join
功能: Join
操作更倾向于处理一对一的关联场景,它基于相同的键将两个数据流的数据匹配在一起。简单来说,如果一个键在两个流中都存在,则这两个流中的元素会被匹配并作为一个输出结果。
即时性: 相较于CoGroup
,Join
不直接涉及窗口操作,虽然它也可以与时间窗口结合使用(例如通过intervalJoin
),但基础的Join
操作是基于数据流中直接的、即时的匹配。. 输出: 在实现上,JoinFunction
处理的是两个具体元素,而不是迭代器,当两个流中找到匹配的键时,会调用JoinFunction
处理这对匹配的元素。这种方式更加直观,适用于一对一匹配的简单场景。
示例
/**
* FlinkGroupDemo 类用于演示Flink中的分组操作
* 该类主要功能是通过处理订单流和产品流,演示如何使用Flink进行流数据的分组处理
*/
public class FlinkGroupDemo {
/**
* 程序的主入口点
* @param args 命令行参数
* @throws Exception 如果在处理过程中发生错误,则抛出异常
*/
public static void main(String[] args) throws Exception {
// 初始化流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 创建随机数生成器
Random random = new Random();
// 创建订单流
SingleOutputStreamOperator<Order> orderStream = env.fromElements(
// 添加订单数据
new Order("001", "Prod00A",10,random.nextInt(10000)),
new Order("002", "Prod00B",20,random.nextInt(10000)),
new Order("003", "Prod00C",30,random.nextInt(10000)),
new Order("004", "Prod00B",40,random.nextInt(10000))
).assignTimestampsAndWatermarks(
// 设置Watermark策略,处理乱序数据
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
// 指定时间戳赋值器
new SerializableTimestampAssigner<Order>() {
@Override
public long extractTimestamp(Order element, long recordTimestamp) {
// 提取事件时间
return element.getCrtTime();
}
}
)
);
// 创建产品流
SingleOutputStreamOperator<Product> prodStream = env.fromElements(
// 添加产品数据
new Product("创维空调", "Prod00A",10,random.nextInt(10000)),
new Product("格力空调", "Prod00B",20,random.nextInt(10000)),
new Product("美的冰箱", "Prod00B",30,random.nextInt(10000))
).assignTimestampsAndWatermarks(
// 设置Watermark策略,处理乱序数据
WatermarkStrategy.<Product>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
// 指定时间戳赋值器
new SerializableTimestampAssigner<Product>() {
@Override
public long extractTimestamp(Product element, long recordTimestamp) {
// 提取事件时间
return element.getCrtTime();
}
}
)
);
// 对订单流和产品流进行连接,并基于产品代码进行分组,每5秒一个窗口
orderStream.coGroup(prodStream)
.where(order -> order.getProdCode())
.equalTo(product -> Product.getProdCode())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Order, Product, String>() {
@Override
public void coGroup(Iterable<Order> first, Iterable<Product> second, Collector<String> out) throws Exception {
// 输出分组结果
out.collect("Orders: " + first + ", Product: " + second);
}
}).print(); // 打印输出结果
env.execute();
}
}