flink函数ProcessFunction
示例:
package cn.guangjun.flink.demo;
import cn.guangjun.flink.pojo.Event;
import cn.guangjun.flink.stream.CustomSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.LinkedHashMap;
/**
* 通用处理函数ProcessFunction的示例
*/
public class ProcessFunctionDemo {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 添加自定义数据源
env.addSource(new CustomSource())
// 使用ProcessFunction对每个事件进行处理
.process(new ProcessFunction<Event, Tuple2<String,Integer>>() {
/**
* 处理单个Event事件
* @param event 当前处理的事件
* @param ctx 上下文,提供计时器、输出等操作
* @param out 收集器,用于输出结果
* @throws Exception 抛出异常
*/
@Override
public void processElement(Event event
, ProcessFunction<Event, Tuple2<String, Integer>>.Context ctx
, Collector<Tuple2<String, Integer>> out) throws Exception {
// 对每个事件,输出一个Tuple,包含用户ID和固定值1
out.collect(new Tuple2<>(event.getName(),1));
}
})
// 按用户ID分组
.keyBy(t->t.f0)
// 使用自定义的KeyedProcessFunction进行进一步处理
.process(new CustomKeyedProcessFunction())
// 打印结果
.print();
// 执行环境执行
env.execute();
}
/**
* 自定义的KeyedProcessFunction类,用于按用户分组计数
*/
public static class CustomKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
// 使用HashMap存储每个用户的累计计数值
private HashMap<String,Integer> accumulator;
/**
* 构造函数,初始化accumulator
*/
private CustomKeyedProcessFunction(){
accumulator = new LinkedHashMap<>();
}
/**
* 处理元素,对用户进行计数
* @param value 当前处理的元素
* @param ctx 上下文对象
* @param out 结果收集器
* @throws Exception 抛出异常
*/
@Override
public void processElement(Tuple2<String, Integer> value
, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx
, Collector<Tuple2<String, Integer>> out) throws Exception {
// 打印当前Key和处理时间
System.out.println("当前Key是:" + ctx.getCurrentKey() + ",处理时间:" + ctx.timerService().currentProcessingTime());
// 如果用户不存在,则初始化计数,并输出
if(!accumulator.containsKey(value.f0)){
accumulator.put(value.f0, 1);
out.collect(new Tuple2<>(value.f0,1));
} else { // 如果用户已存在,则增加计数,并输出新的计数值
Integer sum = accumulator.get(value.f0) + 1;
accumulator.put(value.f0, sum);
out.collect(new Tuple2<>(value.f0,sum));
}
}
}
}