flink之转换算子

2024-08-12 13:10

转换算子

基本转换算子

Map 算子

Map 算子是一种一对一的转换操作,对输入数据流中的每个元素应用一个用户自定义的函数,并将这个函数的输出作为新数据流的元素。这意味着,对于输入流中的每一个元素,map 算子都会产生一个且仅一个新的元素。它常用于简单的数据转换任务,比如数据类型转换、数值运算或属性映射等。

package cn.guangjun.flink.stream;

import cn.guangjun.flink.pojo.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * MapDemo类用于演示DataStream的映射操作
 */
public class MapDemo {
    /**
     * 程序的入口点
     * @param args 命令行参数
     * @throws Exception 如果发生错误,抛出异常
     */
    public static void main(String[] args) throws Exception{
        // 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1,以便于调试和演示
        env.setParallelism(1);

        // 从元素创建DataStream,包含两个Event对象
        DataStreamSource<Event> stream = env.fromElements(
                new Event("张三", "高一", 98),
                new Event("李四", "高二", 77)
        );
        // 打印原始DataStream中的元素
        stream.print();
        // 使用map操作提取Event中的姓名,并打印
        stream.map(event -> event.getName()).print();

        // 执行流处理环境的作业
        env.execute();
    }
}

Filter 算子

Filter 算子用于从输入数据流中过滤出满足特定条件的元素,而那些不满足条件的元素则会被丢弃。用户需要提供一个实现 FilterFunction 接口的函数,该函数会为每个元素返回一个布尔值,如果为 true,则元素被保留;如果为 false,则元素被过滤掉。

/**
 * FilterDemo 类用于演示过滤操作
 */
public class FilterDemo {
    /**
     * 程序的入口点
     * @param args 命令行参数
     * @throws Exception 如果发生异常则抛出
     */
    public static void main(String[] args) throws Exception{
        // 获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1,以便于调试和演示
        env.setParallelism(1);

        // 从元素创建数据流,包含两个Event对象
        DataStreamSource<Event> stream = env.fromElements(
                new Event("张三", "高一", 98),
                new Event("李四", "高二", 77)
        );
        // 打印原始数据流中的所有元素
        stream.print();
        // 过滤出名称为"张三"的事件,然后提取出姓名,并打印
        stream.filter(event -> event.getName().equals("张三")).map(event -> event.getName()).print();
        // 执行流处理作业
        env.execute();
    }
}

FlatMap 算子

FlatMap 算子比 Map 算子更灵活,它可以将输入流中的单个元素转换为零个、一个或多个输出元素。这对于模式匹配、数据拆分或复杂的转换非常有用。FlatMap 需要一个实现了 FlatMapFunction 接口的函数,该函数可以生成任意数量的输出记录。

package cn.guangjun.flink.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2
        env.setParallelism(2);
        // 从元素创建数据流
        DataStreamSource<String> stream = env.fromElements(
                "Furthermore, life is a continuous learning process",
                "Knowledge and education are not confined",
                "This quest for understanding not only enriches our minds but also fosters a sense of wonder and appreciation for the world",
                "Lastly, the essence of life lies in the present moment"
        );
        // 对每个字符串应用flatMap操作,将每个字符的hashCode作为新的元素输出
        // 然后过滤出所有偶数的hashCode值,并打印
        stream.flatMap(new FlatMapFunction<String, Integer>() {
            @Override
            public void flatMap(String text, Collector<Integer> collector) throws Exception {
                for (int i = 0; i < text.length(); i++) {
                    collector.collect(text.substring(i, i + 1).hashCode());
                }
            }
        }).filter(hc -> hc % 2 == 0).print();
        // 执行流处理作业
        env.execute();
    }
}
相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号