Flink Timer定时器
Timer是Apache Flink ProcessFunction的核心功能之一,它在预设的事件时间或处理时间到达时被自动触发,允许用户自定义处理逻辑,如执行定时窗口聚合、管理状态超时或处理延迟数据。该功能应用前提是数据流经keyBy分组,确保每个键相关的处理和定时器管理在单一任务中集中执行,以此维护状态一致性并优化资源利用。
示例
/**
* 自定义的KeyedProcessFunction,用于处理字符串键值对数据,并根据键进行分组处理
* 该类扩展了KeyedProcessFunction,定义了如何处理元素以及如何响应定时器触发
*/
public class CustomKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
// 用于累积相同键的元素数量的HashMap
private HashMap<String,Integer> accumulator;
/**
* 构造函数,初始化accumulator
*/
public CustomKeyedProcessFunction(){
accumulator = new LinkedHashMap<>();
}
/**
* 处理元素的方法
* 当数据到达时,此方法被调用,用于处理当前的元素,并决定是否输出结果或设置定时器
*
* @param value 当前处理的元素,包含字符串和整型的键值对
* @param ctx 上下文对象,用于访问定时器服务和其他功能
* @param out 结果收集器,用于输出处理结果
* @throws Exception 处理过程中可能抛出的异常
*/
@Override
public void processElement(Tuple2<String, Integer> value,
KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
// 获取当前处理时间
Long currTs = ctx.timerService().currentProcessingTime();
System.out.println("数据到达,到达时间:" + currTs);
// 注册一个基于处理时间的定时器,延迟10秒
ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
// 检查当前元素的键是否已存在于accumulator中
if(!accumulator.containsKey(value.f0)){
// 如果不存在,则添加到accumulator,并输出新的计数值(1)
accumulator.put(value.f0, 1);
out.collect(new Tuple2<>(value.f0,1));
} else {
// 如果用户已存在,则增加计数,并输出新的计数值
Integer sum = accumulator.get(value.f0) + 1;
accumulator.put(value.f0, sum);
out.collect(new Tuple2<>(value.f0,sum));
}
}
/**
* 定时器触发时调用的方法
* 用于处理定时任务
*
* @param timestamp 定时器触发的时间戳
* @param ctx 定时器上下文对象,用于访问定时器服务和其他功能
* @param out 结果收集器,用于输出处理结果
* @throws Exception 定时器触发过程中可能抛出的异常
*/
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.OnTimerContext ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
// 当定时器触发时,打印触发时间
System.out.println("定时器触发,触发时间:" + timestamp);
}
}
public class FlinkTimerDemo {
/**
* 主函数,演示如何使用Flink的定时器功能处理事件
* @throws Exception 如果处理过程中发生错误,则抛出异常
*/
public static void main(String[] args) throws Exception {
// 创建流执行环境,这是Flink程序的入口点
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,简化示例的并行处理逻辑
env.setParallelism(1);
// 添加自定义数据源,用于产生事件数据
env.addSource(new CustomSource())
// 使用process函数处理每个事件,此处使用匿名类实现ProcessFunction
.process(new ProcessFunction<Event, Tuple2<String,Integer>>() {
@Override
public void processElement(Event event,
ProcessFunction<Event, Tuple2<String, Integer>>.Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
// 处理单个Event事件,此处简单地将事件名和数字1组成Tuple2输出
out.collect(new Tuple2<>(event.getName(), 1));
}
})
// 按事件名分组
.keyBy(t->t.f0)
// 应用自定义的MyKeyedProcessFunction处理分组后的数据
.process(new CustomKeyedProcessFunction())
// 将处理结果打印到控制台
.print();
// 执行流处理作业
env.execute();
}
}```