apache flink规约聚合班级成绩的示例
在Flink中,"规约聚合"(Aggregation with Reduce Function)是一种数据处理操作,主要用于简化或汇总数据流中的元素。规约聚合的核心概念来源于函数式编程中的“reduce”或“fold”操作。
规约聚合的基本概念
规约聚合通过应用一个用户定义的函数(被称为Reduce Function)来合并数据流中的元素。这个函数接收两个输入参数(通常是同一类型的数据),并输出一个结果,该结果与这两个输入参数具有某种聚合关系。Flink使用这个函数反复作用于数据流中的元素对,从而逐步将整个数据集规约为单个结果值或者更小的集合。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.guangjun.flink</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.19.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
package cn.guangjun.flink.stream;
import cn.guangjun.flink.pojo.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
// 使用Slf4j进行日志记录
@Slf4j
// 实现自定义的并行数据源函数,用于生成事件数据
public class CustomSource implements ParallelSourceFunction<Event> {
// 控制数据源函数是否继续运行的标志
private boolean running = true;
// 用于生成随机数的对象
private Random random = new Random();
/**
* 实现ParallelSourceFunction接口的run方法
* 在一个循环中生成事件数据,直到running标志为false
* @param sourceContext 用于发送数据到流处理引擎的上下文
* @throws Exception 当生成事件数据过程中发生错误时抛出异常
*/
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
// 当running标志为true时,继续生成事件数据
while (running) {
// 生成一个随机数,用于决定事件的类型
int rad = random.nextInt(10) % 2;
Event event = null;
// 根据随机数决定事件的类型和具体内容
if (rad == 0) {
event = new Event("user:" + random.nextInt(50), "高一", random.nextInt(100));
} else if (rad == 1) {
event = new Event("user:" + random.nextInt(50), "高二", random.nextInt(100));
} else {
event = new Event("user:" + random.nextInt(50), "高三", random.nextInt(100));
}
// 记录生成的事件数据
log.info("event输入{}", event);
// 将事件数据发送到流处理引擎
sourceContext.collect(event);
// 暂停1秒,控制数据生成频率
Thread.sleep(1000);
}
}
/**
* 实现ParallelSourceFunction接口的cancel方法
* 取消数据源函数的运行
*/
@Override
public void cancel() {
// 将running标志设置为false,停止生成事件数据
running = false;
}
}
package cn.guangjun.flink.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Watcher;
@AllArgsConstructor
@Data
public class Event {
// 定义学生名称属性
private String name;
// 定义学生年级属性
private String grade;
// 定义学生成绩属性
private int score;
/**
* 重写toString方法,用于返回学生对象的字符串表示
* @return 返回包含学生名称、年级和成绩的字符串
*/
public String toString(){
// 拼接学生信息,格式为:名称:年级:成绩
return name+":"+grade+":"+score;
}
}
``package cn.guangjun.flink.stream;
import cn.guangjun.flink.pojo.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Random;
@Slf4j
public class ReduceDemo {
/**
* 主函数,用于演示如何处理学生信息流数据
* 该函数将执行环境设置为非并行模式,从自定义源获取数据,并对学生信息进行转换和聚合操作
* @param args 命令行参数
* @throws Exception 如果处理过程中发生错误,则抛出异常
*/
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,简化调试和结果的一致性
env.setParallelism(1);
// 从自定义源添加数据流,并将Event数据转换为Tuple3类型
SingleOutputStreamOperator<Tuple3<String, String, Integer>> op1 = env.addSource(new CustomSource())
.map(new MapFunction<Event, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(Event value) throws Exception {
// 将Event对象转换为包含姓名、年级和分数的Tuple3对象
return new Tuple3<>(value.getName(), value.getGrade(), value.getScore());
}
});
// 打印所有输入的Tuple3数据
op1.print("当前输入");
// 按年级聚合数据,计算每个年级的总分
SingleOutputStreamOperator<Tuple3<String, String, Integer>> op2 = op1.keyBy(r -> r.f1)
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
// 使用年级作为键,聚合计算总分
return Tuple3.of(value1.f1,value1.f1, value1.f2 + value2.f2);
}
});
// 打印每个年级的总分
op2.print("当前年级总分");
// 按年级聚合数据,找出每个年级的最大分数
SingleOutputStreamOperator<Tuple3<String, String, Integer>> op3 = op1.keyBy(r ->{
log.info("r:{}",r.f1);
return r.f1;
})
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
// 比较并记录每个年级的最大分数
log.info("[{}]> 当前年级最大分数:{},value2.f2:{}", value1.f1, value1.f2, value2.f2);
return Tuple3.of(value1.f1, value1.f1, value2.f2 > value1.f2 ? value2.f2 : value1.f2);
}
});
// 打印每个年级的最大分数
op3.print("当前年级最大分数");
// 执行流处理任务
env.execute();
}
}