apache flink聚合算子

2024-08-12 12:57

apache flink聚合算子

keyBy

keyBy是一个非常重要的操作,主要用于基于键的分区(partitioning)。当你在处理流式数据或批处理数据时,keyBy允许你根据数据中的某个字段或一组字段对数据进行分组,从而可以对相同键的数据执行本地聚合或窗口操作。

基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理。

在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对DataStream 按照 key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。

数据流 KeyedStream进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。

  • min():在输入流上,对指定的字段求最小值。

  • max():在输入流上,对指定的字段求最大值。

  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包

  • 含字段最小值的整条数据。

  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

    简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;

    但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。

    对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。

    image.png

    ####示例

package cn.guangjun.flink.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

import java.util.Random;

public class WordKeyByDemo {
    /**
     * 主函数,用于演示流处理环境的设置和平行度的配置,同时对输入的数据流进行处理
     * @param args 命令行参数
     * @throws Exception 如果在流处理过程中发生错误,则抛出异常
     */
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2,影响流处理任务的并行执行数量
        env.setParallelism(2);

        // 从Elements创建数据流,包含一段关于生活感悟的文本
        DataStreamSource<String> stream = env.fromElements(
                "Life is a journey filled with various experiences, emotions, and lessons. It is a delicate balance of ups and downs, joys and sorrows, and successes and failures. The essence of life lies in how we perceive and respond to the myriad of situations that come our way.\n" +
                        "One of the most important aspects of life is relationships. Human beings are social creatures, and our connections with family, friends, and colleagues play a significant role in our happiness and well-being. These relationships teach us how to love, care, and support one another. They also challenge us to become better individuals, as we learn to compromise, forgive, and grow together.\n" +
                        "Another crucial element of life is personal growth. Every experience, whether positive or negative, contributes to our development. Life’s challenges force us to step out of our comfort zones, adapt, and overcome obstacles. It is through these trials that we gain wisdom, resilience, and a deeper understanding of ourselves and the world around us.\n" +
                        "Nature also plays a vital role in the essence of life. The beauty and tranquility of the natural world can inspire us, ground us, and remind us of the wonders of existence. Whether it’s the gentle rustling of leaves, the soothing sound of a river, or the breathtaking view of a mountain range, nature has a unique way of nurturing our souls and reminding us of life’s impermanence.\n" +
                        "Furthermore, life is a continuous learning process. Knowledge and education are not confined to classrooms; they are present in every aspect of our lives. Curiosity and the pursuit of knowledge drive us to explore, question, and discover new things. This quest for understanding not only enriches our minds but also fosters a sense of wonder and appreciation for the world.\n" +
                        "Lastly, the essence of life lies in the present moment. Too often, we get caught up in the past or worry about the future, forgetting to cherish the here and now. Mindfulness and gratitude can help us appreciate the simple pleasures and blessings in our lives. By living in the present, we can fully experience the joy, love, and beauty that surround us.\n" +
                        "In conclusion, the essence of life is multifaceted, encompassing relationships, personal growth, nature, learning, and the present moment. It is a delicate tapestry woven from the threads of our experiences, emotions, and choices. By embracing these elements, we can lead a fulfilling and meaningful life, one that is rich in love, wisdom, and joy."
        );

        // 将输入流转换为以单词为键的键值流
        KeyedStream<Tuple3<String, Integer,Integer>, String> keyedStream = stream.flatMap(new FlatMapFunction<String, Tuple3<String, Integer,Integer>>() {
            /**
             * 将输入文本拆分为单词,并为每个单词生成一个元组
             * 每个元组包含单词本身、一个计数器(此处初始化为1)以及一个随机数(用于某些特定的业务逻辑)
             */
            @Override
            public void flatMap(String text, Collector<Tuple3<String, Integer,Integer>> collector) throws Exception {
                // 使用空格分割文本为单词数组
                String[] words = text.split(" ");
                // 遍历单词数组,对每个单词生成一个元组并收集
                for(String word:words){
                    // 收集元组,包含单词、计数器1和一个随机数
                    collector.collect(Tuple3.of(word, 1,new Random().nextInt(10000)));
                }
            }
        }).keyBy(tuple -> tuple.f0); // 根据元组的第一个元素(单词)进行分组

        // 对分组后的数据进行求和操作,并打印结果
        keyedStream.sum("f1").print("sum");
        // 对分组后的数据进行最大值比较,并打印结果
        keyedStream.maxBy("f2").print("maxBy");
        // 执行流处理任务
        env.execute();
    }
}


相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号