flink函数ProcessFunction

2024-08-14 09:58

flink函数ProcessFunction

示例:

package cn.guangjun.flink.demo;

import cn.guangjun.flink.pojo.Event;
import cn.guangjun.flink.stream.CustomSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.LinkedHashMap;

/**
 * 通用处理函数ProcessFunction的示例
 */
public class ProcessFunctionDemo {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);

        // 添加自定义数据源
        env.addSource(new CustomSource())
                // 使用ProcessFunction对每个事件进行处理
                .process(new ProcessFunction<Event, Tuple2<String,Integer>>() {
                    /**
                     * 处理单个Event事件
                     * @param event 当前处理的事件
                     * @param ctx 上下文,提供计时器、输出等操作
                     * @param out 收集器,用于输出结果
                     * @throws Exception 抛出异常
                     */
                    @Override
                    public void processElement(Event event
                            , ProcessFunction<Event, Tuple2<String, Integer>>.Context ctx
                            , Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 对每个事件,输出一个Tuple,包含用户ID和固定值1
                        out.collect(new Tuple2<>(event.getName(),1));
                    }
                })
                // 按用户ID分组
                .keyBy(t->t.f0)
                // 使用自定义的KeyedProcessFunction进行进一步处理
                .process(new CustomKeyedProcessFunction())
                // 打印结果
                .print();
        // 执行环境执行
        env.execute();
    }

    /**
     * 自定义的KeyedProcessFunction类,用于按用户分组计数
     */
    public static class  CustomKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        // 使用HashMap存储每个用户的累计计数值
        private HashMap<String,Integer> accumulator;

        /**
         * 构造函数,初始化accumulator
         */
        private CustomKeyedProcessFunction(){
            accumulator = new LinkedHashMap<>();
        }

        /**
         * 处理元素,对用户进行计数
         * @param value 当前处理的元素
         * @param ctx 上下文对象
         * @param out 结果收集器
         * @throws Exception 抛出异常
         */
        @Override
        public void processElement(Tuple2<String, Integer> value
                , KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx
                , Collector<Tuple2<String, Integer>> out) throws Exception {
            // 打印当前Key和处理时间
            System.out.println("当前Key是:" + ctx.getCurrentKey() + ",处理时间:" + ctx.timerService().currentProcessingTime());
            // 如果用户不存在,则初始化计数,并输出
            if(!accumulator.containsKey(value.f0)){
                accumulator.put(value.f0, 1);
                out.collect(new Tuple2<>(value.f0,1));
            } else { // 如果用户已存在,则增加计数,并输出新的计数值
                Integer sum = accumulator.get(value.f0) + 1;
                accumulator.put(value.f0, sum);
                out.collect(new Tuple2<>(value.f0,sum));
            }
        }
    }
}
相关新闻
热点
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号