Flink合流
Apache Flink中的UNION操作为大数据处理提供了强大的灵活性,允许开发者轻松地将多个数据流合并为一个连续的、未经过滤的(即保留所有重复项)数据流
步骤:
首先定义好每个单独的数据流(利用Flink丰富的数据源API),然后通过简单的.union()方法调用即可完成合并。这个过程高度灵活,支持合并相同或不同类型的数据流(当然,类型需兼容),并且能够自然地融入到Flink的流式处理管道中,与其他转换(如map, filter, reduce等)和窗口操作结合使用
注意:
UNION操作本身不改变元素的顺序(保证每个源流内部的顺序),但在分布式环境中,考虑到数据的分区和并行处理,最终输出的全局顺序可能并非严格按照源流顺序
示例
import cn.guangjun.flink.pojo.User;
import cn.guangjun.flink.stream.CustomUserSecSource;
import cn.guangjun.flink.stream.CustomUserSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
/**
* UnionStreamDemo 类演示了如何使用 Stream 的 union 操作
* 该类提供方法来合并两个 Stream,并处理合并后的数据
*/
public class UnionStreamDemo {
/**
* 主方法,展示如何合并两个 Stream 并处理数据
* @throws Exception 如果流式计算作业执行过程中出现错误,则抛出异常
*/
public static void main(String[] args) throws Exception {
// 获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 创建第一个流,每1秒生成一个User事件
SingleOutputStreamOperator<User> stream1 = env.addSource(new CustomUserSource())
// 为流分配时间戳和水印,确保事件时间顺序
.assignTimestampsAndWatermarks(WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<User>() {
@Override
public long extractTimestamp(User element, long recordTimestamp) {
// 使用Event的时间戳
return element.getTimeStamp();
}
})
);
// 创建第二个流,每3秒生成一个User
SingleOutputStreamOperator<User> stream2 = env.addSource(new CustomUserSecSource())
// 为流分配时间戳和水印,确保事件时间顺序
.assignTimestampsAndWatermarks(WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<User>() {
@Override
public long extractTimestamp(User element, long recordTimestamp) {
// 使用Event的时间戳
return element.getTimeStamp();
}
})
);
// 合并两条流
DataStream<User> union = stream1.union(stream2);
// 处理合并后的流,将User事件转换为String并打印
union.map(new MapFunction<User, String>() {
@Override
public String map(User user) throws Exception {
// 提取User事件的等级
return user.getGrade();
}
}).print("union");
// 执行流式计算作业
env.execute();
}
}
该示例要求参与合并的两个或多个数据流的数据类型必须严格一致。当数据类型相匹配时,合并过程无碍,数据可直接聚合为单一数据流。