CREATE STREAMING TABLE
描述
创建一个流表,流表支持流式处理或增量数据处理。
有关如何使用流表的详细信息,请参阅流表。
语法
CREATE [ OR REPLACE ] STREAMING TABLE [ IF NOT EXISTS ] streaming_table_identifier
create_streaming_clauses AS ( query | code )
参数
OR REPLACE
如果指定,流表的资源将被重新加载。这主要用于获取对流表实现所做的任何更改。
该参数与IF NOT EXISTS
相互排斥,不能同时指定。IF NOT EXISTS
如果指定,则仅在流表不存在时创建该流表。如果系统中已存在指定流表,则流表创建成功(不出错)。
此参数与OR REPLACE
和TEMPORARY
相互排斥,不能同时指定。streaming_table_identifier
指定要创建的流表名称。流表名称可选择使用数据库名称限定。
语法:
[ database_name. ] streaming_table_name
create_view_clauses
这些分句是可选的,对顺序不敏感。格式如下:
[ ( column_name [ COMMENT column_comment ], ... ) ]
用来指定列级的注释。[ COMMENT streaming_table_comment ]
用来指定流表的注释。[ TBLPROPERTIES ( property_name = property_value [ , ... ] ) ]
用来添加元数据的键值对.[ LANGUAGE JAVA | SCALA ]
用来指定流表创建的语言,如果指定了语言则后面需要填写code
而不是query
。
query
一个 SELECT 语句。code
指定要创建的流表的代码。该代码应是一个有效的Java
或Scala
流表内部。最后应当返回一个DataFrame
。
示例
-- 创建或替换带有注释的流表 `experienced_employee`。
CREATE OR REPLACE STREAMING TABLE experienced_employee
(ID COMMENT 'Unique identification number', Name)
COMMENT 'View for experienced employees'
AS SELECT id, name FROM all_employee
WHERE working_years > 5;
-- 在流表不存在时创建一个流表 `experienced_employee_global`。
CREATE STREAMING TABLE experienced_employee_global
IF NOT EXISTS
AS SELECT id, name FROM all_employee WHERE working_years > 5;
-- 使用Java创建流表 `experienced_employee_scala` 。
CREATE STREAMING TABLE experienced_employee_java
LANGUAGE JAVA
AS $$ return spark.sql("SELECT id, name FROM all_employee WHERE working_years > 2"); $$;
-- 使用Scala创建流表 `experienced_employee_scala`。
-- `your_specific_func` 是一个自主编写的函数,其中的逻辑可以自主定义。
CREATE STREAMING TABLE experienced_employee_scala
LANGUAGE SCALA
AS $$
def your_specific_func(): Int = {
/*
your code here
*/
}
val x = your_specific_func()
spark.sql(s"SELECT id, name FROM all_employee WHERE working_years > $x")
$$;
注意
- TODO 待补充