Flink Table 与 SQL API

2024-08-21 22:10

Flink Table 与 SQL API

在Flink中,Table API是其核心API之一,它提供了一种以表格的形式处理数据的声明式编程接口,使得用户能够以SQL-like的方式编写数据处理作业,而无需直接操作低级别的数据流。Table API支持两种主要的编程风格:基于表的API(Table API)和基于SQL的API(SQL API)。这两种方式可以无缝集成,共同构成了Flink的Table & SQL API。

Flink基于Apache Calcite引擎实现对SQL的支持,允许直接在程序中编写SQL进行表转换。

示例

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.demo.flink</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.19.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <!-- Flink 核心Java库依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink 流处理Java API依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink 客户端工具依赖,包含命令行界面和实用函数 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink 连接器基础模块,包含连接器的公共功能 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Kafka连接器,用于与Apache Kafka集成 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>

        <!-- Lombok库,简化Java代码,自动插入getter/setter等 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.32</version>
        </dependency>

        <!-- Flink Table Planner模块,用于Table API和SQL的执行计划生成 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Table API的Java桥接器,连接Table API与DataStream API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink 文件系统连接器,支持与各种文件系统交互 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink CSV格式支持,用于读写CSV数据 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
         <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
    </dependencies>
</project>

java代码:


import cn.guangjun.flink.pojo.Order;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Random;
/**
 * FlinkTableDemo 类用于演示Flink Table API的基本使用
 * Flink Table API是一种用于流处理和批处理的高级API,它允许用户以声明性的方式表达数据转换和操作
 * 通过这个演示,我们可以了解到Flink Table的基本语法和常用操作
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        // 创建并获取Flink流处理执行环境,设置并行度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Random random = new Random();

        // 1. 从Elements生成订单数据流,模拟订单事件的产生
        SingleOutputStreamOperator<Order> eventStream = env.fromElements(
                new Order("001", "Prod00A",10,random.nextInt(10000)),
                new Order("002", "Prod00B",20,random.nextInt(10000)),
                new Order("003", "Prod00C",30,random.nextInt(10000)),
                new Order("004", "Prod00B",40,random.nextInt(10000))
        );

        // 2. 创建TableEnvironment,用于处理Table API相关的操作
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 3. 将数据流转换成表结构以便进行SQL查询或Table API操作
        Table eventTable = tableEnv.fromDataStream(eventStream);


        // 4. 在TableEnvironment中注册一个临时视图
        // 为了能够在SQL查询中方便地引用这个表,我们需要将其注册为一个临时视图。
        tableEnv.createTemporaryView("event" , eventTable);

        // 5. 使用SQL查询产品的订单
        Table resultTable1 = tableEnv.sqlQuery("SELECT * FROM event ");

        // 6. 将处理后的结果表再次转换回数据流,并打印输出
        tableEnv.toDataStream(resultTable1).print("result");
        // 启动执行Flink程序
        env.execute();
    }
}
相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号