Flink Table 与 SQL API
在Flink中,Table API是其核心API之一,它提供了一种以表格的形式处理数据的声明式编程接口,使得用户能够以SQL-like的方式编写数据处理作业,而无需直接操作低级别的数据流。Table API支持两种主要的编程风格:基于表的API(Table API)和基于SQL的API(SQL API)。这两种方式可以无缝集成,共同构成了Flink的Table & SQL API。
Flink基于Apache Calcite引擎实现对SQL的支持,允许直接在程序中编写SQL进行表转换。
示例
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.demo.flink</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.19.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink 核心Java库依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 流处理Java API依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 客户端工具依赖,包含命令行界面和实用函数 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 连接器基础模块,包含连接器的公共功能 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器,用于与Apache Kafka集成 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<!-- Lombok库,简化Java代码,自动插入getter/setter等 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
<!-- Flink Table Planner模块,用于Table API和SQL的执行计划生成 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API的Java桥接器,连接Table API与DataStream API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 文件系统连接器,支持与各种文件系统交互 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CSV格式支持,用于读写CSV数据 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
</project>
java代码:
import cn.guangjun.flink.pojo.Order;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Random;
/**
* FlinkTableDemo 类用于演示Flink Table API的基本使用
* Flink Table API是一种用于流处理和批处理的高级API,它允许用户以声明性的方式表达数据转换和操作
* 通过这个演示,我们可以了解到Flink Table的基本语法和常用操作
*/
public class FlinkTableDemo {
public static void main(String[] args) throws Exception {
// 创建并获取Flink流处理执行环境,设置并行度为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Random random = new Random();
// 1. 从Elements生成订单数据流,模拟订单事件的产生
SingleOutputStreamOperator<Order> eventStream = env.fromElements(
new Order("001", "Prod00A",10,random.nextInt(10000)),
new Order("002", "Prod00B",20,random.nextInt(10000)),
new Order("003", "Prod00C",30,random.nextInt(10000)),
new Order("004", "Prod00B",40,random.nextInt(10000))
);
// 2. 创建TableEnvironment,用于处理Table API相关的操作
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 3. 将数据流转换成表结构以便进行SQL查询或Table API操作
Table eventTable = tableEnv.fromDataStream(eventStream);
// 4. 在TableEnvironment中注册一个临时视图
// 为了能够在SQL查询中方便地引用这个表,我们需要将其注册为一个临时视图。
tableEnv.createTemporaryView("event" , eventTable);
// 5. 使用SQL查询产品的订单
Table resultTable1 = tableEnv.sqlQuery("SELECT * FROM event ");
// 6. 将处理后的结果表再次转换回数据流,并打印输出
tableEnv.toDataStream(resultTable1).print("result");
// 启动执行Flink程序
env.execute();
}
}