SQL

用户可以通过 TableEnvironment 类中的 sqlQuery() 方法执行SQL查询,查询结果会以 Table 形式返回。用户可将 Table 用于后续的 SQL 及 Table 查询,或将其转换成 DataSet 或 DataStream,亦可将它写入到某个 TableSink 中。无论是通过 SQL 还是 Table API 提交的查询都可以进行无缝衔接,系统内部会对它们进行整体优化,并最终转换成一个 Flink 程序执行。

为了在 SQL 查询中使用某个 Table,用户必须先在 TableEnvironment 中对其进行注册Table 的注册来源可以是某个 TableSource,某个现有的 Table,或某个DataStream 或 DataSet。此外,用户还可以通过在 TableEnvironment 中注册外部 Catalog 的方式来指定数据源位置。

为方便使用,在执行 Table.toString() 方法时,系统会自动以一个唯一名称在当前 TableEnvironment 中注册该 Table 并返回该唯一名称。因此,在以下示例中,Table 对象都可以直接以内联(字符串拼接)方式出现在 SQL 语句中。

注意: 现阶段Flink对于SQL的支持还并不完善。如果在查询中使用了系统尚不支持的功能,会引发 TableException 。以下章节将对批环境和流环境下 SQL 功能支持情况做出相应说明。

执行查询

以下示例展示了如何通过内联方式以及注册 table 的方式执行 SQL 查询。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// SQL query with an inlined (unregistered) table
Table table = tableEnv.toTable(ds, "user, product, amount");
Table result = tableEnv.sqlQuery(
  "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// SQL query with a registered table
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// SQL update with a registered table
// create and register a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"product", "amount"};
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sqlQuery(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

// SQL update with a registered table
// create and register a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...)
val fieldNames: Array[String] = Array("product", "amount")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Back to top

语法支持

Flink 内部借助另一个开源项目 Apache Calcite 来解析 SQL 。Calcite 支持标准的 ANSI SQL,但在Flink内部暂时还不支持 DDL 语句。

以下是利用 BNF-范式给出的针对批和流查询的SQL语法支持情况。我们在操作支持章节会以示例形式展示现有功能,并详细说明哪些功能仅适用于流或批环境。

insert:
  INSERT INTO tableReference
  query
  
query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  
selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

Flink SQL 中对待表名、属性名及函数名等标识符都采用类似Java的规则,具体而言:

  • 无论是否用引号引起来,标识符的大小写都会保留;
  • 标识符在进行匹配时是大小写敏感的;
  • 和Java不同的是,Flink SQL 可以利用反引号在标识符中加入非数字和字母的符号,例如“SELECT a AS my field FROM t”。

Back to top

操作支持

Scan, Projection, and Filter

Operation Description
Scan / Select / As
Batch Streaming
SELECT * FROM Orders

SELECT a, c AS d FROM Orders
Where / Filter
Batch Streaming
SELECT * FROM Orders WHERE b = 'red'

SELECT * FROM Orders WHERE a % 2 = 0
User-Defined Scalar Functions (Scalar UDF)
Batch Streaming

和 Table 类似,用户在使用某个 Scalar UDF 之前必须在 TableEnvironment 中对其进行注册。欲了解更多有关定义和注册 Scalar UDF 的详情,请参阅 UDF 文档

SELECT PRETTY_PRINT(user) FROM Orders

Back to top

Aggregations

Operation Description
GroupBy Aggregation
Batch Streaming
Result Updating
SELECT a, SUM(b) as d
FROM Orders
GROUP BY a

注意: 在流环境中使用 groupBy 将会产生一个持续更新的结果。详情请参阅 Streaming Concepts

GroupBy Window Aggregation
Batch Streaming

利用窗口对数据进行分组计算,每组产生一个结果。详情请参阅 Group Windows 章节。

SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window Aggregation
Streaming
SELECT COUNT(amount) OVER (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders

SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders 
WINDOW w AS (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)  

注意: 所有聚合操作都必须基于相同的窗口(即相同的划分、排序及范围策略)进行。目前,Flink SQL 只支持 PRECEDING (UNBOUNDED and BOUNDED) to CURRENT ROW 的范围定义(暂不支持 FOLLOWING)。此外,ORDER BY 目前只支持定义于单个时间属性上。

Distinct
Batch Streaming
Result Updating
SELECT DISTINCT users FROM Orders

注意: 在需要状态存储的流查询中使用 DISTINCT 可能会导致状态数据随数据基数增加而无限增长 。针对该情况,用户可以通过设置“保留时间”参数来定期清理状态数据。详情请见 Streaming Concepts 页。

Grouping Sets, Rollup, Cube
Batch
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))
Having
Batch Streaming
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
User-Defined Aggregate Functions (UDAGG)
Batch Streaming

用户在使用某个 UDAGG 之前同样需要在 TableEnvironment 中对其进行注册。欲了解更多有关定义和注册 UDAGG 的详情,请参阅 UDF documentation 文档。

SELECT MyAggregate(amount)
FROM Orders
GROUP BY users

Back to top

Joins

Operation Description
Inner Equi-Join
Batch Streaming

目前 Flink SQL 只支持 equi-join,即用户至少需要在某个合取表达式中提供一个等值条件。Cross join 和 theta-join 暂不支持。

SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意:Flink SQL 暂未对多表 join 进行优化,实际 join 的顺序等同于 FROM 子句中表出现的顺序。所以在指定表顺序的时候要避免出现 cross join(笛卡尔积),以防查询执行失败。此外,在双流 join 等需要存储状态的查询中,随着输入数据条数的不断增加,状态可能会无限增长。为避免出现该情况,请在查询配置中设定一个合适的状态“保留时间”。详情请见 Streaming Concepts 页。

Outer Equi-Join
Batch Streaming Result Updating

目前 Flink SQL 只支持 equi-join,即用户至少需要在某个合取表达式中提供一个等值条件。Cross join 和 theta-join 暂不支持。

SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

注意:Flink SQL 暂未对多表 join 进行优化,实际 join 的顺序等同于 FROM 子句中表出现的顺序。所以在指定表顺序的时候要避免出现 cross join(笛卡尔积),以防查询执行失败。此外,在双流 join 等需要存储状态的查询中,随着输入数据条数的不断增加,状态可能会无限增长。为避免出现该情况,请在查询配置中设定一个合适的状态“保留时间”。详情请见 Streaming Concepts 页。

Time-Windowed Join
Batch Streaming

Time-windowd join 的触发条件是用户至少提供一个等值条件和一个对双流时间属性的相互约束。其中后者可以通过在两侧时间属性(必须同为 row-time 或 processing-time)上应用两个范围约束 (<, <=, >=, >)、一个 BETWEEN 表达式、或是一个等值条件来实现。以下列出了几个常见的时间属性约束条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
上述例子展示了如何将订单(orders)和收到订单后4小时之内的运输记录(shipments)进行 join。
Expanding arrays into a relation
Batch Streaming

目前还不支持 WITH ORDINALITY 子句。

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join with User Defined Table Functions (UDTF)
Batch Streaming

用户在使用某个 UDTF 之前同样需要在 TableEnvironment 中对其进行注册。欲了解更多有关定义和注册 UDTF 的详情,请参阅 UDF documentation 文档。

Inner Join

SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

注意:当前版本 Left outer lateral join 仅支持以常量 TRUE 为 join 条件。

Back to top

Set Operations

Operation Description
Union
Batch
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION
    (SELECT user FROM Orders WHERE b = 0)
)
UnionAll
Batch Streaming
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION ALL
    (SELECT user FROM Orders WHERE b = 0)
)
Intersect / Except
Batch
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  INTERSECT
    (SELECT user FROM Orders WHERE b = 0)
)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  EXCEPT
    (SELECT user FROM Orders WHERE b = 0)
)
In
Batch Streaming

如果某个表达式的值出现在给定的子查询中则返回 true。目标子查询只允许包含一列,且该列的类型必须和表达式所求值的类型相同。

SELECT user, amount
FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

注意:In 操作在流式查询中会被重写为 join + groupBy 的形式。在执行时所需存储的状态量可能会随着输入行数的增加而增长。为避免该情况,请在查询配置中为状态设置“保留时间”。详情请见 Streaming Concepts 页。

Exists
Batch Streaming

如果目标子查询中至少包含一行则返回 true。

SELECT user, amount
FROM Orders
WHERE product EXISTS (
    SELECT product FROM NewProducts
)

注意: 在流式查询中,系统需要将 Exists 操作重写为 join + groupBy 的形式,对于无法重写的 Exists 操作,Flink SQL 暂不支持。查询执行期间所需存储的状态量可能会随着输入行数的增加而无限增长。为避免该情况,请在查询配置中为状态设置“保留时间”。详情请见 Streaming Concepts 页。

Back to top

OrderBy & Limit

Operation Description
Order By
Batch Streaming
SELECT *
FROM Orders
ORDER BY orderTime

注意: 如果要对流式查询的结果进行排序,必须首先按照时间属性升序进行。除此之外用户还可以指定一些额外的排序属性。

Limit
Batch
SELECT *
FROM Orders
LIMIT 3

Back to top

Insert

Operation Description
Insert Into
Batch Streaming

结果输出表在使用之前需先在 TableEnvironment 中进行注册,详情请见注册 TableSink 章节。此外用户需要保证输出表的 schema 和查询结果的 schema 保持一致。

INSERT INTO OutputTable
SELECT users, tag
FROM Orders

Back to top

Group Windows

用户可以像定义普通分组一样,利用 GROUP BY 子句在查询中定义 Group Window。子句中的 group window function 会对每个分组中的数据进行计算并生成一个只包含一行记录的结果。下列 group window function 同时适用于批场景和流场景下的表查询。

Group Window Function Description
TUMBLE(time_attr, interval) 定义一个滚动时间窗口(tumbling time window)。滚动时间窗口会将记录分配到连续、不重叠且具有固定时间长度的窗口中。例如一个长度为5分钟滚动时间窗口会将记录按照每5分钟进行分组。滚动时间窗口允许定义在 row-time 和 processing-time 之上,但只有流场景下才支持 processing-time 。
HOP(time_attr, interval, interval) 定义一个跳跃时间窗口 (hopping time window,在Table API 中称为滑动时间窗口)。每个跳跃时间窗口都有一个固定大小的时间长度(通过第一个 interval 参数定义)和一个跳跃间隔(通过第二个 interval 参数定义)。如果跳跃间隔小于窗口长度,则不同窗口实例之间会出现重叠,即每条数据可能都会被分配到多个窗口实例中。例如对一个长度为15分钟,跳跃间隔为5分钟窗口,每行记录都会被分配到3个长度为15分钟的不同窗口实例中,它们之间的处理间隔是5分钟。跳跃时间窗口允许定义在 row-time 和 processing-time 之上,但只有流场景下才允许使用 processing-time
SESSION(time_attr, interval) 定义一个会话时间窗口 (session time window)。会话时间窗口没有固定长度,其边界是通过一个“非活动时间间隔”来指定,即如果超过一段时间没有满足现有窗口条件的数据到来,则判定窗口结束。例如给定一个时间间隔为30分钟的会话时间窗口,如果某条记录到来之前已经有超过30分钟没有记录,则会开启一个新的窗口实例(否则该记录会被加到已有窗口实例中)。同样,如果再出现连续30分钟的记录真空期,则当前窗口实例会被关闭。会话时间窗口允许定义在 row-time 和 processing-time 之上,但同样只有流场景下才允许使用 processing-time。

时间属性

对于流场景下的SQL,group window function 中的 time_attr 参数必须是某个有效的 processing-time 或 row-time 属性。有关如何定义时间属性,请参照时间属性说明文档

对于批场景下的SQL,group window function 中的 time_attr 参数必须是某个 TIMESTAMP 类型的属性。

访问 Group Window 的开始和结束时间

用户可以通过以下辅助函数来访问 Group Window 的开始、结束时间以及可以用于后续计算的时间属性。

Auxiliary Function Description
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

返回对应滚动、跳跃或会话时间窗口的时间下限(包含边界值)。

TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

返回对应滚动、跳跃或会话时间窗口的时间上限(不含边界值)。

注意: 该窗口时间上限值的类型是 timestamp,因此不允许作为 rowtime 属性用于后续其他基于时间的计算中(如 time-windowed joinsgroup window 或 over window aggregations)。

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

返回对应滚动、跳跃或会话时间窗口的时间上限(不含边界值)。

注意: 该窗口时间上限值是一个 rowtime 属性,因此可以用于后续其他基于时间的计算中(如 time-windowed joinsgroup window 或 over window aggregations)。

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)

返回一个 proctime 属性,可用于后续其他基于时间的计算中(如 time-windowed joinsgroup window 或 over window aggregations)。

注意: 在使用上述辅助函数时必须保证其参数与 GROUP BY 子句中 group window function 的参数完全相同。

以下是在流场景中使用 group windows 查询的示例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");

// compute SUM(amount) per day (in row-time)
Table result1 = tableEnv.sqlQuery(
  "SELECT user, " +
  "  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
  "  SUM(amount) FROM Orders " +
  "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");

// compute every hour the SUM(amount) of the last 24 hours in row-time
Table result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");

// compute SUM(amount) per session with 12 hour inactivity gap (in row-time)
Table result4 = tableEnv.sqlQuery(
  "SELECT user, " +
  "  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
  "  SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " +
  "  SUM(amount) " +
  "FROM Orders " +
  "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)

// compute SUM(amount) per day (in row-time)
val result1 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
    """.stripMargin)

// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")

// compute every hour the SUM(amount) of the last 24 hours in row-time
val result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")

// compute SUM(amount) per session with 12 hour inactivity gap (in row-time)
val result4 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
      |  SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
    """.stripMargin)

Back to top

数据类型

Flink SQL 运行时逻辑是基于 DataSet 和 DataStream API 构建的,因此它内部也采用 TypeInformation 来指定数据类型。用户可以从源码中的 org.apache.flink.table.api.Types 类查看所有支持的数据类型。在此我们给出 SQL 类型、Table API 类型以及相应 Java 类型的对照关系表。

Table API SQL Java type
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.SQL_DATE DATE java.sql.Date
Types.SQL_TIME TIME java.sql.Time
Types.SQL_TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long
Types.PRIMITIVE_ARRAY ARRAY e.g. int[]
Types.OBJECT_ARRAY ARRAY e.g. java.lang.Byte[]
Types.MAP MAP java.util.HashMap
Types.MULTISET MULTISET e.g. java.util.HashMap<String, Integer> for a multiset of String

除上述类型之外,Flink SQL 中的字段也允许是普通类型(generic type)及复合类型(例如 POJO 或 Tuple)。对普通类型,Flink SQL 会把它们看做“黑盒”一般,允许其作为 UDF 的参数或处理对象;而对复合类型,Flink SQL 允许用户通过内置函数对其进行访问(详见下文值访问函数章节)。

Back to top

内置函数

Flink SQL为用户内置了很多用于数据转换的常用函数,本章会对它们进行简要概述。如果您发现某个所需函数暂不支持,可以自己利用 UDF 机制 实现,亦或是如果您认为该函数比较通用,欢迎为其创建一个详细的 JIRA issue

比较函数

Comparison functions Description
value1 = value2

如果 value1 等于 value2,返回 TRUE;如果 value1value2 为 NULL 返回 UNKNOWN。

value1 <> value2

如果 value1 不等 value2,返回 TRUE;如果 value1value2 为 NULL 返回 UNKNOWN。

value1 > value2

如果 value1 大于 value2,返回 TRUE;如果value1value2 为 NULL 返回 UNKNOWN。

value1 >= value2

如果 value1 大于或等于 value2,返回 TRUE;如果 value1value2 为 NULL 返回 UNKNOWN。

value1 < value2

如果 value1 小于 value2,返回 TRUE;如果 value1value2 为 NULL 返回 UNKNOWN。

value1 <= value2

如果 value1 小于或等于 value2,返回 TRUE;如 value1value2 为 NULL 返回 UNKNOWN。

value IS NULL

如果 value 为 NULL,返回 TRUE。

value IS NOT NULL

如果 value 不为 NULL,返回 TRUE。

value1 IS DISTINCT FROM value2

value1value2 不同,返回 TRUE。此处所有 NULL 值都被看做相同。

例如:1 IS DISTINCT FROM NULL 返回 TRUE; NULL IS DISTINCT FROM NULL 返回 FALSE。

value1 IS NOT DISTINCT FROM value2

如果 value1value2 相同,返回 TRUE。此处所有 NULL 值都被看做相同。

例如:1 IS NOT DISTINCT FROM NULL 返回 FALSE; NULL IS NOT DISTINCT FROM NULL 返回 TRUE。

value1 BETWEEN [ ASYMMETRIC | SYMMETRIC ] value2 AND value3

在默认情况下 (或使用 ASYMMETRIC 关键字), 如果 value1 大于等于 value2 且小于等于 value3,则返回 TRUE。 在使用 SYMMETRIC 关键字的情况下, 如果 value1value2value3 之间(包含边界),则返回 TRUE。 如果 value2value3 为 NULL, 根据情况返回 FALSE 或 UNKNOWN。

例如: 12 BETWEEN 15 AND 12 返回 FALSE; 12 BETWEEN SYMMETRIC 15 AND 12 返回 TRUE; 12 BETWEEN 10 AND NULL 返回 UNKNOWN; 12 BETWEEN NULL AND 10 返回 FALSE; 12 BETWEEN SYMMETRIC NULL AND 12 返回 UNKNOWN。

value1 NOT BETWEEN [ ASYMMETRIC | SYMMETRIC ] value2 AND value3

在默认情况下 (或使用 ASYMMETRIC 关键字), 如果 value1 小于 value2 或大于 value3,则返回 TRUE。 在使用 SYMMETRIC 关键字的情况下, 如果 value1 不在 value2value3 之间(包含边界),则返回 TRUE。 如果 value2value3 为 NULL, 根据情况返回 TRUE 或 UNKNOWN。

例如: 12 NOT BETWEEN 15 AND 12 返回 TRUE; 12 NOT BETWEEN SYMMETRIC 15 AND 12 返回 FALSE; 12 NOT BETWEEN NULL AND 15 返回 UNKNOWN; 12 NOT BETWEEN 15 AND NULL 返回 TRUE; 12 NOT BETWEEN SYMMETRIC 12 AND NULL 返回 UNKNOWN。

string1 LIKE string2 [ ESCAPE char ]

如果 string1 满足模式串 string2,则返回 TRUE。当 string1string2 为 NULL,返回 UNKNOWN。如有需要可自定义一个转义字符 char

注意: 当前版本暂不支持自定义转义字符。

string1 NOT LIKE string2 [ ESCAPE char ]

如果 string1 不满足模式串 string2,则返回 TRUE。当 string1string2 为 NULL,返回 UNKNOWN。如有需要可自定义一个转义字符 char

注意: 当前版本暂不支持自定义转义字符。

string1 SIMILAR TO string2 [ ESCAPE char ]

如果 string1 满足SQL正则串 string2,则返回 TRUE。当 string1string2 为 NULL,返回 UNKNOWN。如有需要可自定义一个转义字符 char

注意: 当前版本暂不支持自定义转义字符。

string1 NOT SIMILAR TO string2 [ ESCAPE string3 ]

如果 string1 不满足SQL正则串 string2,则返回 TRUE。当 string1string2 为 NULL,返回 UNKNOWN。如有需要可自定义一个转义字符 char

注意: 当前版本暂不支持自定义转义字符。

value1 IN (value2 [, value3]* )

如果 value1 出现在给定列表 (value2, value3, ...) 里,则返回 TRUE。 当 (value2, value3, ...) 包含 NULL, 如果元素可以被找到则返回 TRUE,否则返回 UNKNOWN。如果 value1 为 NULL,总是返回 UNKNOWN。

例如: 4 IN (1, 2, 3) 返回 FALSE; 1 IN (1, 2, NULL) 返回 TRUE; 4 IN (1, 2, NULL) 返回 UNKNOWN。

value1 NOT IN (value2 [, value3]* )

如果 value1 未出现在给定列表 (value2, value3, ...) 里,则返回 TRUE。 当 (value2, value3, ...) 包含 NULL, 如果元素可以被找到则返回 FALSE,否则返回 UNKNOWN。如果 value1 为 NULL,总是返回 UNKNOWN。

例如: 4 NOT IN (1, 2, 3) 返回 TRUE; 1 NOT IN (1, 2, NULL) 返回 FALSE; 4 NOT IN (1, 2, NULL) 返回 UNKNOWN。

EXISTS (sub-query)

如果给定子查询 sub-query 至少包含一行结果,返回 TRUE。只支持可以被重写为 join + groupBy 形式的查询。

注意: 在流式查询中,系统需要将 Exists 操作重写为 join + groupBy 的形式,对于无法重写的 Exists 操作,Flink SQL 暂不支持。查询执行期间所需存储的状态量可能会随着输入行数的增加而无限增长。为避免该情况,请在查询配置中为状态设置“保留时间”。详情请见 Streaming Concepts 页。

value IN (sub-query)

如果 value 包含在 sub-query 的结果集中,则返回 TRUE。

注意:IN 在流式查询中会被重写为 join + groupBy 的形式。在执行时所需存储的状态量可能会随着输入行数的增加而增长。为避免该情况,请在查询配置中为状态设置“保留时间”。详情请见 Streaming Concepts 页。

value NOT IN (sub-query)

如果 value 没包含在 sub-query 的结果集中,则返回 TRUE。

注意:NOT IN 在流式查询中会被重写为 join + groupBy 的形式。在执行时所需存储的状态量可能会随着输入行数的增加而增长。为避免该情况,请在查询配置中为状态设置“保留时间”。详情请见 Streaming Concepts 页。

逻辑函数

Logical functions Description
boolean1 OR boolean2

如果 boolean1boolean2 为 TRUE,返回 TRUE。支持三值逻辑。

例如: TRUE OR UNKNOWN 返回 TRUE.

boolean1 AND boolean2

如果 boolean1boolean2 都为 TRUE,返回 TRUE。支持三值逻辑。

例如: TRUE AND UNKNOWN 返回 UNKNOWN.

NOT boolean

如果 boolean 为 FALSE,返回 TRUE;如果 boolean 为 TRUE,返回 FALSE;如果 boolean 为 UNKNOWN,返回 UNKNOWN。

boolean IS FALSE

如果 boolean 为 FALSE,返回 TRUE;如果 boolean 为 TRUE 或 UNKNOWN,返回 FALSE。

boolean IS NOT FALSE

如果 boolean 为 TRUE 或 UNKNOWN,返回 TRUE;如果 boolean 为 FALSE,返回 FALSE。

boolean IS TRUE

如果 boolean 为 TRUE,返回 TRUE;如果 boolean 为 FALSE 或 UNKNOWN,返回 FALSE。

boolean IS NOT TRUE

如果 boolean 为 FALSE 或 UNKNOWN,返回 TRUE;如果 boolean 为 TRUE,返回 FALSE。

boolean IS UNKNOWN

如果 boolean 为 UNKNOWN,返回 TRUE;如果 boolean 为 TRUE 或 FALSE,返回 FALSE。

boolean IS NOT UNKNOWN

如果 boolean 为 TRUE 或 FALSE,返回 TRUE;如果 boolean 为 UNKNOWN,返回 FALSE。

算数运算函数

Arithmetic functions Description
+ numeric

返回 numeric

- numeric

返回负 numeric

numeric1 + numeric2

返回 numeric1numeric2 的结果。

numeric1 - numeric2

返回 numeric1numeric2 的结果。

numeric1 * numeric2

返回 numeric1numeric2 的结果。

numeric1 / numeric2

返回 numeric1 除以 numeric2 的结果。

POWER(numeric1, numeric2)

返回 numeric1numeric2 次幂。

ABS(numeric)

返回 numeric 的绝对值。

MOD(numeric1, numeric2)

返回 numeric1 除以 numeric2 的余数。只有当 numeric1 为负,结果才为负。

SQRT(numeric)

返回 numeric 的平方根。

LN(numeric)

返回 numeric 的自然对数(以e为底)。

LOG10(numeric)

返回以10为底 numeric 的对数。

LOG(numeric2)
LOG(numeric1, numeric2)

以一个参数进行调用时返回 numeric2 的自然对数;以两个参数进行调用时返回以 numeric1 为底 numeric2 的对数。

注意: 目前版本 numeric2 必须大于0,且 numeric1 必须大于1。

EXP(numeric)

返回 e 的 numeric 次幂。

CEIL(numeric)
CEILING(numeric)

numeric 向上取整,返回一个大于或等于 numeric 的最小整数。

FLOOR(numeric)

numeric 向下取整,返回一个小于或等于 numeric 的最大整数。

SIN(numeric)

返回 numeric 的正弦。

COS(numeric)

返回 numeric 的余弦。

TAN(numeric)

返回 numeric 的正切。

COT(numeric)

返回 numeric 的余切。

ASIN(numeric)

返回 numeric 的反正弦。

ACOS(numeric)

返回 numeric 的反余弦。

ATAN(numeric)

返回 numeric 的反正切。

DEGREES(numeric)

返回某个弧度 numeric 的对应角度值。

RADIANS(numeric)

返回某个角度 numeric 的对应弧度值。

SIGN(numeric)

返回 numeric 的正负。

ROUND(numeric, integer)

返回 numeric 四舍五入后的值,保留 integer 位小数。

PI

返回一个非常接近圆周率 pi 的值。

E()

返回一个非常接近自然底数 e 的值。

RAND()

返回一个 0.0 (包含) 到 1.0 (不包含)之间的伪随机数。

RAND(integer)

利用种子值 integer 返回一个 0.0 (包含) 到 1.0 (不含)之间的伪随机数。如果种子值相同,两个 RAND 函数会返回完全相同的伪随机数序列。

RAND_INTEGER(integer)

返回一个 0 (包含) 到 integer(不含)之间的伪随机整数。

RAND_INTEGER(integer1, integer2)

利用种子值 integer1 返回一个 0 (包含) 到 integer2(不含)之间的伪随机整数。如果种子值和上限相同,两个 RAND_INTEGER 函数会返回完全相同的伪随机数序列。

BIN(integer)
      

返回 integer 的二进制字符串。

例如: BIN(4) 返回 '100'; BIN(12) 返回 '1100'。

字符串函数

String functions Description
string1 || string2

返回字符串 string1string2 连接后的值。

CHARACTER_LENGTH(string)
CHAR_LENGTH(string)

返回 string 中所含的字符数。

UPPER(string)

返回 string 的大写形式。

LOWER(string)

返回 string 的小写形式。

POSITION(string1 IN string2)

返回 string1string2 中首次出现的位置(从1开始计算);如果没有出现,返回0。

TRIM([ BOTH | LEADING | TRAILING ] string1 FROM string2)

返回一个将 string1string2 首部或(和)尾部移除的新字符串。默认情况下会移除两端空白字符。

OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])

返回一个利用 string2 替换 string1 中从 integer1 位置开始长度为 integer2 (默认为 string2 的长度)个字符的新字符串。

例如: OVERLAY('This is an old string' PLACING ' new' FROM 10 FOR 5) 返回 "This is a new string"

SUBSTRING(string FROM integer1 [ FOR integer2 ])

返回一个从 integer1 位置开始、长度为 integer2(默认到结尾)的 string 的子串。

INITCAP(string)

返回一个将 string 中每个单词首字母都转为大写,其余字母都转为小写的新字符串。这里的单词指的是一连串不间断的字母序列。

CONCAT(string1, string2, ...)

返回一个将 string1string2 等按顺序连接起来的新字符串。如果其中任一字符串为 NULL,返回 NULL。

例如:CONCAT("AA", "BB", "CC") 返回 "AABBCC"。

CONCAT_WS(string1, string2, string3,...)

返回一个利用分隔符 string1string2string3 等按顺序连接起来的新字符串。分隔符 string1 会被加到连接的字符串之间。如果 string1 为 NULL,返回 NULL。和 CONCAT() 相比,CONCAT_WS() 会自动跳过值为 NULL 的字符串。

例如:CONCAT_WS("~", "AA", "BB", "", "CC") 返回 "AA~BB~~CC"。

LPAD(string1, integer, string2)

返回一个利用 string2 从左侧对 string1 进行填充直到其长度达到 integer 的新字符串。如果 integer 小于 string1 的长度,则 string1 会被缩减至长度 integer

例如: LPAD('hi',4,'??') 返回 "??hi";LPAD('hi',1,'??') 返回 "h"。

RPAD(string1, integer, string2)

返回一个利用 string2 从右侧对 string1 进行填充直到其长度达到 integer 的新字符串。如果 integer 小于 string1 的长度,则 string1 会被缩减至长度 integer

例如: RPAD('hi',4,'??') 返回 "hi??";RPAD('hi',1,'??') 返回 "h"。

条件函数

Conditional functions Description
CASE value
WHEN value1_1 [, value1_2 ]* THEN result1
[ WHEN value2_1 [, value2_2 ]* THEN result2 ]*
[ ELSE resultZ ]
END

返回 value 第一次出现在集合 (valueX_1, valueX_2, ...) 中时对应 resultX 的值。如果 value 没有出现在任何给定集合中则根据 resultZ 的给定情况返回 resultZ 或 NULL。

CASE
WHEN condition1 THEN result1
[ WHEN conditionN THEN resultN ]*
[ ELSE resultZ ]
END

返回条件 conditionX 首次满足时其对应的 resultX 值。如果所有条件均不满足则根据 resultZ 的给定情况返回 resultZ 或 NULL。

NULLIF(value, value)

value1 等于 value2 时返回 NULL,否则返回 value1

例如: NULLIF(5, 5) 返回 NULL;NULLIF(5, 0) 返回5。

COALESCE(value1, value2 [, value3 ]* )

返回 value1, value2, ... 中首个不为 NULL 的值。

例如:COALESCE(NULL, 5) 返回5。

类型转换函数

Type conversion functions Description
CAST(value AS type)

返回一个将 value 转换为 type 类型后的值。有关支持的类型请参考数据类型章节

时间函数

Temporal functions Description
DATE string

返回一个由 "yyyy-MM-dd" 格式的字符串 string 解析得到的 SQL 日期。

TIME string

返回一个由 "HH:mm:ss" 格式的字符串 string 解析得到的 SQL 时间。

TIMESTAMP string

返回一个由 "yyy-MM-dd HH:mm:ss[.SSS]" 格式的字符串 string 解析得到的 SQL 时间戳。

INTERVAL string range

返回一个由 string 解析出的 SQL 时间区间,如果 string 的格式为 "dd hh:mm:ss.fff" 则以毫秒为单位,如果格式为 "yyyy-mm" 则以月为单位。支持的 range 有: DAYMINUTEDAY TO HOURDAY TO SECOND(以毫秒为单位)、 YEAR 以及 YEAR TO MONTH (以月为单位)。

例如:INTERVAL '10 00:00:00.004' DAY TO SECONDINTERVAL '10' DAYINTERVAL '2-10' YEAR TO MONTH

CURRENT_DATE

返回当前UTC日期。

CURRENT_TIME

以 SQL 时间形式返回UTC的当前时间。

CURRENT_TIMESTAMP

以 SQL 时间戳形式返回UTC的当前时间。

LOCALTIME

以 SQL 时间形式返回本地区的当前时间。

LOCALTIMESTAMP

以 SQL 时间戳格式返回本地区的当前时间。

EXTRACT(timeintervalunit FROM temporal)

从某个时间点或时间区间 temporal 内以 long 类型返回指定时间字段 timeintervalunit的对应值。

例如:EXTRACT(DAY FROM DATE '2006-06-05') 返回5。

YEAR(date)

从给定日期 date 内提取年份,等价于 EXTRACT(YEAR FROM date)

例如:YEAR(DATE '1994-09-27') 返回1994。

FLOOR(timepoint TO timeintervalunit)

返回一个将时间 timepoint 向下取整到指定时间单位 timeintervalunit 的值。

例如:FLOOR(TIME '12:44:31' TO MINUTE) 返回 12:44:00。

CEIL(timepoint TO timeintervalunit)

返回一个将时间 timepoint 向上取整到指定时间单位 timeintervalunit 的值。

例如:CEIL(TIME '12:44:31' TO MINUTE) 返回 12:45:00。

QUARTER(date)

从给定日期 date 内提取季度(取值从1到4),等价于 EXTRACT(QUARTER FROM date)

例如:QUARTER(DATE '1994-09-27') 返回3。

MONTH(date)

从给定日期 date 内提取月份(取值从1到12),等价于 EXTRACT(MONTH FROM date)

例如:MONTH(DATE '1994-09-27') 返回9。

WEEK(date)

从给定日期 date 内提取周数(取值从1到53),等价于 EXTRACT(WEEK FROM date)

例如:WEEK(DATE '1994-09-27') 返回39。

DAYOFYEAR(date)

从给定日期 date 内提取当年已过天数(取值从1到366),等价于 EXTRACT(DOY FROM date)

例如:DAYOFYEAR(DATE '1994-09-27') 返回270。

DAYOFMONTH(date)

从给定日期 date 内提取当月已过天数(取值从1到31),等价于 EXTRACT(DAY FROM date)

例如:DAYOFMONTH(DATE '1994-09-27') 返回27。

DAYOFWEEK(date)

从给定日期 date 内提取当周已过天数(取值从1到7,周日 = 1),等价于 EXTRACT(DOW FROM date)

例如:DAYOFWEEK(DATE '1994-09-27') 返回3。

HOUR(timestamp)

从给定时间戳 timestamp 内提取当日已过小时数(取值从0到23),等价于 EXTRACT(HOUR FROM timestamp)

例如:HOUR(TIMESTAMP '1994-09-27 13:14:15') 返回13。

MINUTE(timestamp)

从给定时间戳 timestamp 内提取当小时已过分钟数(取值从0到59),等价于 EXTRACT(MINUTE FROM timestamp)

例如:MINUTE(TIMESTAMP '1994-09-27 13:14:15') 返回13。

SECOND(timestamp)

从给定时间戳 timestamp 内提取当分钟已过秒数(取值从0到59),等价于 EXTRACT(SECOND FROM timestamp)

例如:MINUTE(TIMESTAMP '1994-09-27 13:14:15') 返回15。

(timepoint1, temporal1) OVERLAPS (timepoint2, temporal2)

如果两个时间区间 (timepoint1, temporal1) 和 (timepoint2, temporal2) 有重叠,则返回 TRUE。其中 temporal1temporal2 可以是一个时间点或时间区间。

例如:(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) 返回 TRUE; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) 返回 FALSE。

DATE_FORMAT(timestamp, string)

返回一个将时间 timestamp 按照 string 进行格式化的字符串。具体格式说明请参照日期格式说明表

例如:DATE_FORMAT(ts, '%Y, %d %M') 返回 "2017, 05 May".

TIMESTAMPADD(unit, integer, timestamp)

返回一个将 integer(带符号)个时间单元 unit 添加到时间 timestamp 后的时间值。其中 unit 的取值可以是:SECONDMINUTEHOURDAYWEEKMONTHQUARTERYEAR

例如:TIMESTAMPADD(WEEK, 1, '2003-01-02') 返回2003-01-09。

聚合函数

Aggregate functions Description
COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*)

默认或使用 ALL 关键字的情况下,返回所有不为 NULL 的 expression 的数目。使用 DISTINCT 关键字的情况下相同值只会统计一次。

COUNT(*)
COUNT(1)

返回数据行数。

AVG([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行计算 expression 的算数平均值。使用 DISTINCT 关键字的情况下相同值只会计算一次。

SUM([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行的 expression 求和。使用 DISTINCT 关键字的情况下相同值只会计算一次。

MAX([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行统计 expression 的最大值。使用 DISTINCT 关键字的情况下相同值只会计算一次。

MIN([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行统计 expression 的最小值。使用 DISTINCT 关键字的情况下相同值只会计算一次。

STDDEV_POP([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行的 expression 计算总体标准差。使用 DISTINCT 关键字的情况下相同值只会计算一次。

STDDEV_SAMP([ ALL | DISTINCT ] expression)

默认或使用 ALL 关键字的情况下,对所有输入行的 expression 计算样本标准差。使用 DISTINCT 关键字的情况下相同值只会计算一次。

VAR_POP(value)

默认或使用 ALL 关键字的情况下,对所有输入行的 expression 计算总体方差(总体标准差的平方根)。使用 DISTINCT 关键字的情况下相同值只会计算一次。

VAR_SAMP(value)

默认或使用 ALL 关键字的情况下,对所有输入行的 expression 计算样本方差(样本标准差的平方根)。使用 DISTINCT 关键字的情况下相同值只会计算一次。

COLLECT(value)

默认或使用 ALL 关键字的情况下,将所有输入行的 expression 生成一个多重集。使用 DISTINCT 关键字的情况下相同值只会计算一次。

分组函数

Grouping functions Description
GROUP_ID()

返回一个在分组过程中可以用以标识 grouping key 组合的整数。

GROUPING(expression1 [, expression2]* )
GROUPING_ID(expression1 [, expression2]* )

返回一个对应所选 grouping expressions 的位向量。

值访问函数

Value access functions Description
tableName.compositeType.field

根据名称返回 Flink 所支持的某一复合类型(例如 Tuple、POJO 等)指定字段的值。

tableName.compositeType.*

扁平式地对 Flink 所支持的某一复合类型(例如 Tuple、POJO 等)内的全部子类型进行拆解,每个子类型都作为一个单独的字段。多数情况下,所生成的字段名称和原始子类型的字段名称相近,层级之间会以 $ 号进行连接(例如:mypojo$mytuple$f0)。

值构建函数

Value constructor functions Description
ROW(value1, [, value2]*)
(value1, [, value2]*)

根据所提供的列表 (value1, value2, ...) 生成一个 row。

ARRAY ‘[’ value1 [, value2 ]* ‘]’

根据所提供的列表 (value1, value2, ...) 生成一个 array。

MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’

根据所提供的键值对列表 ((value1, value2), (value3, value4), ...)生成一个 map。

数组函数

Array functions Description
CARDINALITY(array)

返回数组 array 中的元素个数。

array ‘[’ integer ‘]’

返回数组 arrayinteger 位置的值。下标从1开始。

ELEMENT(array)

返回数组 array(其长度必须为1)中的唯一元素;如果 array 为空则返回 NULL;如果 array 的长度大于1,则会执行失败。

字典函数

Map functions Description
CARDINALITY(map)

返回字典 map 中的元素个数。

map ‘[’ value ‘]’

根据指定键 value 访问字典 map 中的值。

哈希函数

Hash functions Description
MD5(string)

返回输入串 string 的 MD5 编码值,结果以32位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA1(string)

返回输入串 string 的 SHA-1 编码值,结果以40位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA224(string)

返回输入串 string 的 SHA-224 编码值,结果以56位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA256(string)

返回输入串 string 的 SHA-256 编码值,结果以64位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA384(string)

返回输入串 string 的 SHA-384 编码值,结果以96位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA512(string)

返回输入串 string 的 SHA-512 编码值,结果以128位16进制数表示;如果 string 为 NULL,返回 NULL。

SHA2(string, hashLength)

返回输入串 string 的某 SHA-2 类别(SHA-224、SHA-256、SHA-384或SHA-512)的编码值。其编码长度由 hashLength 控制,可选范围为224、256、384或512。如果 stringhashLength 为 NULL,返回 NULL。

暂不支持的函数

Flink SQL 暂不支持以下函数:

  • 二进制字符串相关的算子及函数
  • 系统函数

Back to top

保留字

虽然当前版本还无法支持 SQL 的全部功能,Flink SQL 为将来可能需要实现的功能保留了部分关键字。如果您希望用它们进行命名,请注意使用反引号进行标识。例如: `value``count`。全部保留字如下:

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

日期格式说明表

Specifier Description
%a
星期名称的缩写 (Sun .. Sat)
%b
月份名称的缩写 (Jan .. Dec)
%c
数值类型的月份 (1 .. 12)
%D
带有英文尾缀的月份天数 (0th, 1st, 2nd, 3rd, ...)
%d
一月中的第几天 (01 .. 31)
%e
数值类型的月份天数 (1 .. 31)
%f
秒数的若干分之几 (打印时显示6位:000000 .. 999000;解析时可识别1-9位:0 .. 999999999) (时间戳以毫秒数截断)
%H
24时制的小时 (00 .. 23)
%h
12时制的小时 (01 .. 12)
%I
12时制的小时 (01 .. 12)
%i
数值类型的分钟 (00 .. 59)
%j
一年中的第几天 (001 .. 366)
%k
小时 (0 .. 23)
%l
小时 (1 .. 12)
%M
月份全称 (January .. December)
%m
数值类型的月份 (01 .. 12)
%p
AMPM
%r
带有 AMPM 的12小时时间 (hh:mm:ss AM/PM)
%S
秒数 (00 .. 59)
%s
秒数 (00 .. 59)
%T
24时制的时间 (hh:mm:ss)
%U
一年中的第几周 (00 .. 53),以周日为一周的第一天计算
%u
一年中的第几周 (00 .. 53),以周一为一周的第一天计算
%V
一年中的第几周 (00 .. 53),以周日为一周的第一天计算,配合 %X 使用
%v
一年中的第几周 (00 .. 53),以周一为一周的第一天计算,配合 %x 使用
%W
星期名称 (Sunday .. Saturday)
%w
一周之中的第几天 (0 .. 6), 以周天位一周的第一天计算
%X
星期对应的年份(4位数字),以周天为一周的第一天计算,配合 %V 使用
%x
星期对应的年份(4位数字),以周一为一周的第一天计算,配合 %v 使用
%Y
4位年份数字
%y
2位年份数字
%%
一个 % 字面量

Back to top