Flink Table SQL处理流程示例
/**
* FlinkQuerySqlDemo 类用于演示如何使用 Flink 执行 SQL 查询
*/
public class FlinkQuerySqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置执行环境的并行度为1,便于调试和理解程序
env.setParallelism(1);
// 创建TableEnvironment以支持SQL操作
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义一个DDL语句来创建一个表user,该表的数据来源于文件系统中的input.csv文件,格式为CSV
String createDDL = "CREATE TABLE user_event (" +
" name STRING, " +
" age INT, " +
" idn STRING, " +
" crtTime STRING" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'input.csv', " +
" 'format' = 'csv' " +
")";
// 执行DDL语句,实际创建表user
tableEnv.executeSql(createDDL);
// 执行一个SQL查询,对user按idn分组计数
Table result = tableEnv.sqlQuery("select idn ,count(1) as cnt from user_event group by idn");
// 定义另一个DDL语句,创建一个输出表console,用于将结果数据打印到控制台
String createPrintOutDDL = "CREATE TABLE user_console (" +
" idn STRING, " +
" cnt BIGINT" +
") WITH (" +
" 'connector' = 'print' " +
")";
// 执行创建输出表的DDL
tableEnv.executeSql(createPrintOutDDL);
// 将之前查询的结果插入到输出表console中,实现打印输出
result.executeInsert("user_console");
// 注释掉了env.execute(),因为在当前上下文中,tableEnv已经执行了executeSql和executeInsert操作,
// 这些操作内部会触发流处理任务的执行。因此,通常不需要再单独调用env.execute()。
env.execute();
}
}
注意:
Apache Flink 支持多种类型的连接器(connectors),用于从不同的数据源读取数据或将数据写入不同的目的地。以下是一些常见的 Flink 连接器类型:
文件系统连接器(File System):
filesystem
: 用于从本地文件系统、HDFS 或其他兼容文件系统中读取或写入文件。
数据库连接器:
jdbc
: 用于连接关系型数据库,如 MySQL、PostgreSQL、H2 等。datagen
: 用于生成合成数据,通常用于测试和演示目的。
消息队列连接器:
kafka
: 用于从 Apache Kafka 主题中读取或写入数据。rabbitmq
: 用于与 RabbitMQ 集成。pulsar
: 用于与 Apache Pulsar 集成。
日志和数据流连接器:
socket
: 用于通过 TCP 套接字读取或写入数据。filelog
: 用于读取日志文件。
分布式文件系统连接器:
hdfs
: 用于与 Hadoop Distributed File System (HDFS) 集成。s3
: 用于与 Amazon S3 集成。
其他存储和搜索系统连接器:
elasticsearch
: 用于与 Elasticsearch 集成。hbase
: 用于与 Apache HBase 集成。redis
: 用于与 Redis 集成。
实时通信连接器:
nats
: 用于与 NATS 集成。kinesis
: 用于与 Amazon Kinesis 集成。
Web API 连接器:
http
: 用于通过 HTTP 请求发送或接收数据。
自定义连接器:
自定义连接器: Flink 允许开发自定义连接器,以支持特定的数据源或接收器。
这些连接器可以用于 Flink 的不同部署模式,包括流处理和批处理。在选择连接器时,你需要考虑你的数据源、数据格式、性能要求以及特定的业务需求。每种连接器都有其自己的配置选项,可以在创建表时在 WITH
子句中指定。