查询流式数据
算场为多种内部和外部数据源提供了流式查询能力。
从流式处理系统查询数据
您可以从以下流式处理系统Kafka读取流式输出数据。以下示例演示从 Kafka 读取的交互式流式处理,并将读取的数据写入数据表:
# 增量数据加载成DataFrame
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
# DataFrame添加到表中
df.write.insertInto(("<table identifier>"))
查询表的增量数据
如果以流式方式对算场中的表进行查询,仅需要在表名后面添加 (stream) 后缀,例如使用 Python 或 SQL 执行从表读取增量数据:
display(spark.read.table("table_name(stream)"))
SELECT * FROM table_name(stream);
您也可以使用 api 实现对同一表的多个流式查询,来支持多个数据场景使用。每一个流式查询可以通过标签进行区分。例如执行从表按照 "read_tag" 来读取增量数据以及当前数据的问题:
val (df, position) = spark.streams.loadIncremental("table_name", "read_tag")
如果需要重新获取之前的数据或者从指定位置读取增量数据,可以在 loadIncremental 中指定开始位置。例如
val (df, position) = spark.streams.loadIncremental("table_name", "read_tag", start_position)
您也可以使用 listReadPositions 或 dropReadPosition 来列举或删除云存储中的流式标签和位置信息。例如
// 列举所有流式标签所对应的当前位置信息
val readPositions: Map[String, Long] = spark.streams.listReadPositions("table_name")
// 删除指定的流式标签
spark.streams.dropReadPosition("table_name", "read_tag")
关于使用 spark.streams 的函数请参阅:
加载云对象存储中的增量数据
通过 COPY INTO 命令将数据从云对象存储的数据以全量或增量方式导入湖仓。请参阅 云对象存储增量数据导入。
指定物化视图或流处理表的增量表
对于创建的物化视图 (Materialized View) 或流处理表 (Streaming Table), 如果存在多个源数据表的join查询情况, 默认每次触发增量刷新的时候获取第一张表的增量数据以及其他所有表的全量数据,在进行join查询后将结果进行物化保存。您也可以在指定源表名上添加"(stream)"后缀来指定获取该表的增量数据,这时候没有添加后缀的表会获取其全量数据。请参阅 物化视图 和 流处理表。