转换算子
基本转换算子
Map 算子
Map 算子是一种一对一的转换操作,对输入数据流中的每个元素应用一个用户自定义的函数,并将这个函数的输出作为新数据流的元素。这意味着,对于输入流中的每一个元素,map 算子都会产生一个且仅一个新的元素。它常用于简单的数据转换任务,比如数据类型转换、数值运算或属性映射等。
package cn.guangjun.flink.stream;
import cn.guangjun.flink.pojo.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* MapDemo类用于演示DataStream的映射操作
*/
public class MapDemo {
/**
* 程序的入口点
* @param args 命令行参数
* @throws Exception 如果发生错误,抛出异常
*/
public static void main(String[] args) throws Exception{
// 获取流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,以便于调试和演示
env.setParallelism(1);
// 从元素创建DataStream,包含两个Event对象
DataStreamSource<Event> stream = env.fromElements(
new Event("张三", "高一", 98),
new Event("李四", "高二", 77)
);
// 打印原始DataStream中的元素
stream.print();
// 使用map操作提取Event中的姓名,并打印
stream.map(event -> event.getName()).print();
// 执行流处理环境的作业
env.execute();
}
}
Filter 算子
Filter 算子用于从输入数据流中过滤出满足特定条件的元素,而那些不满足条件的元素则会被丢弃。用户需要提供一个实现 FilterFunction
接口的函数,该函数会为每个元素返回一个布尔值,如果为 true
,则元素被保留;如果为 false
,则元素被过滤掉。
/**
* FilterDemo 类用于演示过滤操作
*/
public class FilterDemo {
/**
* 程序的入口点
* @param args 命令行参数
* @throws Exception 如果发生异常则抛出
*/
public static void main(String[] args) throws Exception{
// 获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,以便于调试和演示
env.setParallelism(1);
// 从元素创建数据流,包含两个Event对象
DataStreamSource<Event> stream = env.fromElements(
new Event("张三", "高一", 98),
new Event("李四", "高二", 77)
);
// 打印原始数据流中的所有元素
stream.print();
// 过滤出名称为"张三"的事件,然后提取出姓名,并打印
stream.filter(event -> event.getName().equals("张三")).map(event -> event.getName()).print();
// 执行流处理作业
env.execute();
}
}
FlatMap 算子
FlatMap 算子比 Map 算子更灵活,它可以将输入流中的单个元素转换为零个、一个或多个输出元素。这对于模式匹配、数据拆分或复杂的转换非常有用。FlatMap 需要一个实现了 FlatMapFunction
接口的函数,该函数可以生成任意数量的输出记录。
package cn.guangjun.flink.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为2
env.setParallelism(2);
// 从元素创建数据流
DataStreamSource<String> stream = env.fromElements(
"Furthermore, life is a continuous learning process",
"Knowledge and education are not confined",
"This quest for understanding not only enriches our minds but also fosters a sense of wonder and appreciation for the world",
"Lastly, the essence of life lies in the present moment"
);
// 对每个字符串应用flatMap操作,将每个字符的hashCode作为新的元素输出
// 然后过滤出所有偶数的hashCode值,并打印
stream.flatMap(new FlatMapFunction<String, Integer>() {
@Override
public void flatMap(String text, Collector<Integer> collector) throws Exception {
for (int i = 0; i < text.length(); i++) {
collector.collect(text.substring(i, i + 1).hashCode());
}
}
}).filter(hc -> hc % 2 == 0).print();
// 执行流处理作业
env.execute();
}
}