Flink分流
Flink直接利用ProcessFunction的侧面输出流功能进行分流。侧面输出流(Side Output)是一种高级特性,它为数据流处理任务提供了更灵活的数据路由方式,允许用户根据业务需求将数据流中的部分元素导向除默认输出之外的其他输出通道。
场景:
日志与监控:在处理数据流时,可以将错误或警告信息通过侧面输出发送到日志系统或监控平台,而主要的数据处理流程不受影响。
数据分流:根据数据的内容或属性,将数据分流到不同的处理路径,比如将满足特定条件的事件发送到一个用于实时分析的流,其余数据则继续主流程处理。
内容过滤与聚合:在进行复杂的数据转换和聚合操作时,可能需要将某些中间结果或异常数据分离出来,进行单独处理或存储
示例
import cn.guangjun.flink.pojo.Event;
import cn.guangjun.flink.stream.CustomSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* OutputTagDemo 类用于演示如何在Flink流处理中使用侧输出(Side Output)功能
* 侧输出允许在流处理过程中,根据条件将元素输出到不同的流中,这里主要展示了如何使用OutputTag来实现侧输出
*/
public class OutputTagDemo {
// 定义一个名为"张三"的OutputTag
private static OutputTag<Event> zhangsanTag = new OutputTag<Event>("张三"){};
// 定义一个名为"李四"的OutputTag
private static OutputTag<Event> lisiTag = new OutputTag<Event>("李四"){};
/**
* 主函数,用于设置Flink流处理环境,并实现根据名字分发事件到不同的流中
* @param args 命令行参数
* @throws Exception 可能抛出的异常
*/
public static void main(String[] args) throws Exception {
// 获取流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,以便于调试和演示
env.setParallelism(1);
// 添加自定义数据源,返回一个包含Event的流
SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource());
// 使用process函数对流中的每个元素进行处理,并根据条件输出到不同的流中
SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
/**
* 对每个事件元素进行处理
* 根据事件中人的名字,决定输出到主流还是侧输出流中
* @param value 当前处理的事件
* @param ctx 上下文对象,用于访问上下文方法,如输出方法
* @param out 主流的输出收集器
* @throws Exception 处理过程中可能抛出的异常
*/
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
// 根据事件中的名字,决定输出到对应的侧输出流或主流
if (value.getName().equals("张三")){
// 利用上下文output方法定义分发规则,将名为"张三"的事件输出到对应的侧输出流
ctx.output(zhangsanTag, value);
} else if (value.getName().equals("李四")){
// 将名为"李四"的事件输出到对应的侧输出流
ctx.output(lisiTag, value);
} else {
// 其他情况分发至主流
out.collect(value);
}
}
});
// 通过getSideOutput方法获取对应标签的侧流进行后续处理
// 将标签为"张三"的侧输出流内容打印出来
processedStream.getSideOutput(zhangsanTag).print("张三:");
// 将标签为"李四"的侧输出流内容打印出来
processedStream.getSideOutput(lisiTag).print("李四:");
// 将主流内容打印出来
processedStream.print("main");
// 执行流处理作业
env.execute();
}
}