Flink侧流处理延迟数据示例

2024-08-16 08:12

Flink侧流处理延迟数据示例

import cn.guangjun.flink.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
 * DataDelayedDemo 类用于演示流处理中的数据延迟情况
 */
public class DataDelayedDemo {
    /**
     * 主函数,用于设置流处理环境和处理逻辑
     * @param args 命令行参数
     * @throws Exception 如果处理过程中发生错误,则抛出异常
     */
    public static void main(String[] args) throws Exception {
        // 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);

        // 从socket接收数据,并转换为Event对象
        SingleOutputStreamOperator<Event> map = env.socketTextStream("127.0.0.1", 8888).map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String s) throws Exception {
                        // 解析接收到的字符串,转换为Event对象
                        String[] split = s.split(",");
                        return new Event(split[0], split[1], Integer.parseInt(split[2]),Integer.parseInt(split[3]) );
                    }
                })
                // 设置水位线,最大延迟时间为2秒
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                // 使用Event对象中的时间戳
                                return element.getTimeStamp();
                            }
                        }));

        // 设置侧输出流,用于处理延迟数据
        OutputTag<Event> outputTag = new OutputTag<Event>("delayed") {
        };
        // 按照键进行分组,并设置滚动窗口
        SingleOutputStreamOperator<String> aggregate = map.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 允许的最大延迟时间为1分钟
                .allowedLateness(Time.minutes(1))
                // 将延迟数据输出到侧输出流
                .sideOutputLateData(outputTag)
                // 自定义聚合函数和处理窗口函数
                .aggregate(new CustomAggregate(), new ProcessWindowFunction<String, String, Boolean, TimeWindow>() {
                    @Override
                    public void process(Boolean aBoolean, ProcessWindowFunction<String, String, Boolean, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        // 获取当前窗口的起始和结束时间
                        Long start = context.window().getStart();
                        Long end = context.window().getEnd();
                        // 输出聚合结果
                        out.collect(elements.iterator().next());
                        // 打印当前窗口信息
                        System.out.println("【当前窗口】[" + start + ":" + end + ")");
                    }
                });
        // 打印主输出流的结果
        aggregate.print("result");
        // 打印侧输出流的结果
        aggregate.getSideOutput(outputTag).print("side_out_put");
        // 执行流处理任务
        env.execute();
    }
}
相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号