Flink定时器Timer

2024-08-16 08:20

Flink Timer定时器

Timer是Apache Flink ProcessFunction的核心功能之一,它在预设的事件时间或处理时间到达时被自动触发,允许用户自定义处理逻辑,如执行定时窗口聚合、管理状态超时或处理延迟数据。该功能应用前提是数据流经keyBy分组,确保每个键相关的处理和定时器管理在单一任务中集中执行,以此维护状态一致性并优化资源利用。

示例

/**
 * 自定义的KeyedProcessFunction,用于处理字符串键值对数据,并根据键进行分组处理
 * 该类扩展了KeyedProcessFunction,定义了如何处理元素以及如何响应定时器触发
 */
public class CustomKeyedProcessFunction  extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
    // 用于累积相同键的元素数量的HashMap
    private HashMap<String,Integer> accumulator;

    /**
     * 构造函数,初始化accumulator
     */
    public CustomKeyedProcessFunction(){
        accumulator = new LinkedHashMap<>();
    }

    /**
     * 处理元素的方法
     * 当数据到达时,此方法被调用,用于处理当前的元素,并决定是否输出结果或设置定时器
     *
     * @param value 当前处理的元素,包含字符串和整型的键值对
     * @param ctx 上下文对象,用于访问定时器服务和其他功能
     * @param out 结果收集器,用于输出处理结果
     * @throws Exception 处理过程中可能抛出的异常
     */
    @Override
    public void processElement(Tuple2<String, Integer> value,
                               KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx,
                               Collector<Tuple2<String, Integer>> out) throws Exception {
        // 获取当前处理时间
        Long currTs = ctx.timerService().currentProcessingTime();
        System.out.println("数据到达,到达时间:" + currTs);
        // 注册一个基于处理时间的定时器,延迟10秒
        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
        // 检查当前元素的键是否已存在于accumulator中
        if(!accumulator.containsKey(value.f0)){
            // 如果不存在,则添加到accumulator,并输出新的计数值(1)
            accumulator.put(value.f0, 1);
            out.collect(new Tuple2<>(value.f0,1));
        } else {
            // 如果用户已存在,则增加计数,并输出新的计数值
            Integer sum = accumulator.get(value.f0) + 1;
            accumulator.put(value.f0, sum);
            out.collect(new Tuple2<>(value.f0,sum));
        }
    }

    /**
     * 定时器触发时调用的方法
     * 用于处理定时任务
     *
     * @param timestamp 定时器触发的时间戳
     * @param ctx 定时器上下文对象,用于访问定时器服务和其他功能
     * @param out 结果收集器,用于输出处理结果
     * @throws Exception 定时器触发过程中可能抛出的异常
     */
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.OnTimerContext ctx,
                        Collector<Tuple2<String, Integer>> out) throws Exception {
        // 当定时器触发时,打印触发时间
        System.out.println("定时器触发,触发时间:" + timestamp);
    }
}

public class FlinkTimerDemo {
    /**
     * 主函数,演示如何使用Flink的定时器功能处理事件
     * @throws Exception 如果处理过程中发生错误,则抛出异常
     */
    public static void main(String[] args) throws Exception {
        // 创建流执行环境,这是Flink程序的入口点
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1,简化示例的并行处理逻辑
        env.setParallelism(1);
        // 添加自定义数据源,用于产生事件数据
        env.addSource(new CustomSource())
                // 使用process函数处理每个事件,此处使用匿名类实现ProcessFunction
                .process(new ProcessFunction<Event, Tuple2<String,Integer>>() {
                    @Override
                    public void processElement(Event event,
                                               ProcessFunction<Event, Tuple2<String, Integer>>.Context ctx,
                                               Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 处理单个Event事件,此处简单地将事件名和数字1组成Tuple2输出
                        out.collect(new Tuple2<>(event.getName(), 1));
                    }
                })
                // 按事件名分组
                .keyBy(t->t.f0)
                // 应用自定义的MyKeyedProcessFunction处理分组后的数据
                .process(new CustomKeyedProcessFunction())
                // 将处理结果打印到控制台
                .print();
        // 执行流处理作业
        env.execute();
    }
}```
相关新闻
热点
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号