apache flink规约聚合班级成绩的示例

2024-08-12 12:49

apache flink规约聚合班级成绩的示例

在Flink中,"规约聚合"(Aggregation with Reduce Function)是一种数据处理操作,主要用于简化或汇总数据流中的元素。规约聚合的核心概念来源于函数式编程中的“reduce”或“fold”操作。

规约聚合的基本概念

规约聚合通过应用一个用户定义的函数(被称为Reduce Function)来合并数据流中的元素。这个函数接收两个输入参数(通常是同一类型的数据),并输出一个结果,该结果与这两个输入参数具有某种聚合关系。Flink使用这个函数反复作用于数据流中的元素对,从而逐步将整个数据集规约为单个结果值或者更小的集合。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.guangjun.flink</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.19.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
           <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>
package cn.guangjun.flink.stream;

import cn.guangjun.flink.pojo.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;

// 使用Slf4j进行日志记录
@Slf4j
// 实现自定义的并行数据源函数,用于生成事件数据
public class CustomSource implements ParallelSourceFunction<Event> {
    // 控制数据源函数是否继续运行的标志
    private boolean running = true;
    // 用于生成随机数的对象
    private Random random = new Random();

    /**
     * 实现ParallelSourceFunction接口的run方法
     * 在一个循环中生成事件数据,直到running标志为false
     * @param sourceContext 用于发送数据到流处理引擎的上下文
     * @throws Exception 当生成事件数据过程中发生错误时抛出异常
     */
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        // 当running标志为true时,继续生成事件数据
        while (running) {
            // 生成一个随机数,用于决定事件的类型
            int rad = random.nextInt(10) % 2;
            Event event = null;
            // 根据随机数决定事件的类型和具体内容
            if (rad == 0) {
                event = new Event("user:" + random.nextInt(50), "高一", random.nextInt(100));
            } else if (rad == 1) {
                event = new Event("user:" + random.nextInt(50), "高二", random.nextInt(100));
            } else {
                event = new Event("user:" + random.nextInt(50), "高三", random.nextInt(100));
            }
            // 记录生成的事件数据
            log.info("event输入{}", event);
            // 将事件数据发送到流处理引擎
            sourceContext.collect(event);
            // 暂停1秒,控制数据生成频率
            Thread.sleep(1000);

        }
    }

    /**
     * 实现ParallelSourceFunction接口的cancel方法
     * 取消数据源函数的运行
     */
    @Override
    public void cancel() {
        // 将running标志设置为false,停止生成事件数据
        running = false;
    }
}
package cn.guangjun.flink.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Watcher;

@AllArgsConstructor
@Data
public class Event  {
    // 定义学生名称属性
    private String name;
    // 定义学生年级属性
    private String grade;
    // 定义学生成绩属性
    private int score;

    /**
     * 重写toString方法,用于返回学生对象的字符串表示
     * @return 返回包含学生名称、年级和成绩的字符串
     */
    public String toString(){
        // 拼接学生信息,格式为:名称:年级:成绩
        return name+":"+grade+":"+score;
    }
}

``package cn.guangjun.flink.stream;


import cn.guangjun.flink.pojo.Event;

import lombok.extern.slf4j.Slf4j;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;


import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.Random;

@Slf4j

public class ReduceDemo {

/**

* 主函数,用于演示如何处理学生信息流数据

* 该函数将执行环境设置为非并行模式,从自定义源获取数据,并对学生信息进行转换和聚合操作

* @param args 命令行参数

* @throws Exception 如果处理过程中发生错误,则抛出异常

*/

public static void main(String[] args) throws Exception {

// 创建流处理执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置并行度为1,简化调试和结果的一致性

env.setParallelism(1);

// 从自定义源添加数据流,并将Event数据转换为Tuple3类型

SingleOutputStreamOperator<Tuple3<String, String, Integer>> op1 = env.addSource(new CustomSource())

.map(new MapFunction<Event, Tuple3<String, String, Integer>>() {

@Override

public Tuple3<String, String, Integer> map(Event value) throws Exception {

// 将Event对象转换为包含姓名、年级和分数的Tuple3对象

return new Tuple3<>(value.getName(), value.getGrade(), value.getScore());

}

});

// 打印所有输入的Tuple3数据

op1.print("当前输入");


// 按年级聚合数据,计算每个年级的总分

SingleOutputStreamOperator<Tuple3<String, String, Integer>> op2 = op1.keyBy(r -> r.f1)

.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {

@Override

public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {

// 使用年级作为键,聚合计算总分

return Tuple3.of(value1.f1,value1.f1, value1.f2 + value2.f2);

}

});

// 打印每个年级的总分

op2.print("当前年级总分");

    // 按年级聚合数据,找出每个年级的最大分数
    SingleOutputStreamOperator<Tuple3<String, String, Integer>> op3 = op1.keyBy(r ->{
                log.info("r:{}",r.f1);
                return r.f1;
            })
            .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
                @Override
                public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
                    // 比较并记录每个年级的最大分数
                    log.info("[{}]> 当前年级最大分数:{},value2.f2:{}", value1.f1, value1.f2, value2.f2);
                    return Tuple3.of(value1.f1, value1.f1, value2.f2 > value1.f2 ? value2.f2 : value1.f2);
                }
            });
    // 打印每个年级的最大分数
    op3.print("当前年级最大分数");

    // 执行流处理任务
    env.execute();
}

}


相关新闻
热点
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号