工作流
工作流
欢迎使用算场数据 AI 平台,本教程旨在帮您了解和熟悉平台的工作流功能,跟随本教程,您将了解到如何通过算场工作流进行数据抽取(extract),转换(transform),和加载(load)等工作。我们将向您展示数据从原始数据(bronze),到清洗过后的中间数据(silver),并最终聚合为可以用于报表展示或者AI训练的最终数据(gold)的全流程。
数据集
我们将上传订单表orders和顾客表customers作为原始数据进行清洗转换并做一些数据分析的工作。
上传数据
参考上传数据模块,创建数据库workflow_demo
以及两张数据表orders_bronze
, customers_bronze
。

创建算力
工作流的运行同样离不开算力的支持,参考新建集群模块创建如下集群:
- 名称:
workflow_demo
- 集群类型:
湖仓
- 规格:
X-Small

工作表/笔记本/看板
工作表
, 笔记本
和 看板
是工作流中的最小可用节点。工作流的整体运行情况依赖于每个节点的运行结果。在本教程中,我们将创建三个工作表以及一个看板以构建并运行我们的工作流。
数据清洗
我们将首先对导入进来的orders
表以及customers
表做清洗工作,包括:
- 数据去重:去掉所有重复列。
- 数据过滤:过滤掉关键列值为空或者异常的数据,比如顾客ID,订单ID等。
- 列值转换:对于一些缩写或者用枚举值表示的数据,转换为实际意义,更加清晰明了的呈现出结果。
orders表清洗
- 进入工作模块,并创建新的工作表,修改名称为
bronze_orders
- 我们首先对于orders表做简单的去重以及过滤掉
o_orderkey
为空的值并存储为一个新的viewdistinct_orders
:create view if not exists workflow_demo.distinct_orders as select distinct * from workflow_demo.orders_bronze where o_orderkey is not null;
- 然后我们对于
distinct_orders
做列值转换处理,把没有意义的orderstatus
值进行转换:create materialized view if not exists workflow_demo.orders_silver as select o_orderkey, o_custkey, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, case o_orderstatus when 'O' then 'OPEN' when 'F' then 'FINISHED' when 'P' then 'PENDING' end as o_orderstatus from workflow_demo.distinct_orders;
- 为了确保多次运行时都会重新创建
view
,我们需要在创建之前检查并drop已经存在的view:drop view if exists workflow_demo.orders_silver; drop view if exists workflow_demo.distinct_orders;
- 汇总后的以上脚本为:可以尝试运行上述脚本并得到结果:
drop view if exists workflow_demo.orders_silver; drop view if exists workflow_demo.distinct_orders; create view if not exists workflow_demo.distinct_orders as select distinct * from workflow_demo.orders_bronze where o_orderkey is not null; create materialized view if not exists workflow_demo.orders_silver as select o_orderkey, o_custkey, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, case o_orderstatus when 'O' then 'OPEN' when 'F' then 'FINISHED' when 'P' then 'PENDING' end as o_orderstatus from workflow_demo.distinct_orders;
customer表清洗
- 进入工作模块,并创建新的工作表,修改名称为
bronze_customers
- 我们对于customers表做简单的去重以及过滤掉
c_custkey
为空的值并存储为一个新的vieworders_silver
:create materialized view if not exists workflow_demo.customers_silver as select distinct * from workflow_demo.customers_bronze where c_custkey is not null;
- 为了确保多次运行时都会重新创建
view
,我们需要在创建之前检查并drop已经存在的view:drop view if exists workflow_demo.customers_silver;
- 汇总后的以上脚本为:可以尝试运行上述脚本并得到结果:
drop view if exists workflow_demo.customers_silver; create materialized view if not exists workflow_demo.customers_silver as select distinct * from workflow_demo.customers_bronze where c_custkey is not null;
数据汇总
接下来我们对上面清洗过的数据汇总成一张表,以便后续更为直观的查询以及分析。
- 进入工作模块,并创建新的工作表,修改名称为
silver_customer_order
- 本例中,因为我们重点关注一定用户范围内用户的购买行为分析,所以我们会过滤掉范围之外的用户的订单,具体的join脚本如下:
create table if not exists workflow_demo.customer_join_order as select * from workflow_demo.customers_silver left join workflow_demo.orders_silver on customers_silver.c_custkey = orders_silver.o_custkey;
- 为了确保多次运行时都会重新创建
table
,我们需要在创建之前检查并drop已经存在的table
:drop table if exists workflow_demo.customer_join_order;
- 汇总后的以上脚本为:可以尝试运行上述脚本并得到结果:
drop table if exists workflow_demo.customer_join_order; create table if not exists workflow_demo.customer_join_order as select * from workflow_demo.customers_silver left join workflow_demo.orders_silver on customers_silver.c_custkey = orders_silver.o_custkey;
产出报表汇总看板
经过清洗、提炼、聚合后的数据可以用来生成出不同维度的报表并直观得进行展示。
创建一个新的看板,命名为
customer_order
进入工作模块,并创建新的工作表,修改名称为
gold_analysis
对于聚合好的数据,我们查询出可以用于生成报表得列:
select c_custkey, c_name, c_nationkey, c_mktsegment, o_orderkey, o_totalprice, o_orderdate, o_orderstatus from workflow_demo.customer_join_order where o_orderkey is not null;
运行上述脚本并得到结果:
展示订单状态的图表,转到
图表
栏,选择饼图
,轴和值分别选为o_orderstatus
以及Count
o_orderstatus
,把这个图表发送到看板
customer_order
重复上述4-5步骤,并添加如下几个报表
- 图表类型:
柱状图
,轴:c_custkey
,值:count
o_orderkey
- 图表类型:
柱状图
,轴:c_custkey
,值:sum
o_totalprice
- 图表类型:
饼图
,轴:c_mktsegment
,值:sum
o_totalprice
- 图表类型:
折线图
,轴:o_orderdate
,值:sum
o_totalprice
最后看板如下图所示:
- 图表类型:
工作流
最后,我们将用上面创建的这些工作表以及看板作为节点来构建我们的工作流,并查看工作流运行状态和结果,以及配置周期性运行的工作流。
创建工作流
- 点击导航栏工作-工作流,并创建一个新的工作流,命名为
customer_order_analysis
- 展开左侧工作表以及看板,并将刚才创建的
bronze_orders
, 将其拖动到右侧任务配置区域,修改节点名称为bronze_orders
,选择执行集群workflow_demo
,并点击右下角保存任务
。 - 依次将
bronze_customers
,silver_customer_order
,gold_analysis
工作表 以及customer_order
看板加入到任务配置区域。 - 在节点上拖动箭头,并按照如下顺序添加依赖关系:
silver_customer_order
依赖于bronze_customers
和bronze_orders
gold_analysis
依赖于silver_customer_order
customer_order
依赖于gold_analysis
如图所示:
运行工作流
- 点击右上角
立即运行
运行一下我们刚刚创建好的工作流。 - 点击页面上方
任务配置
的左侧运行结果
查看刚刚工作流的运行结果。 - 运行结果以及运行时间:页面上绿色的方块表示运行成功的节点/工作流,蓝色的表示正在运行或等待运行的节点/工作流。红色的表示运行失败的节点/工作流。将鼠标移动到对应的节点/工作流上可以看这个节点/工作流运行的具体情况,包括运行时间、排队时间等。
配置周期运行、自动触发
调度与触发器可以配置自动周期性触发我们的工作流,是基于cron
的任务调度器,支持简约和高级配置模式,既可以间隔运行,也可以配置按照周期性时间运行。
- 回到
任务配置
界面,在右侧调度与触发器
点击添加触发器
。 - 我们想要每隔10分钟就刷新一下相关的报表,以便于我们更快的实时调整对应的客户营销策略,在调度和触发器界面调度选择每隔10分钟触发,点击保存。
- 在新添加的触发器下面,点击启动以启动配置好的触发器。
- 启动后,可以看到自动触发的工作流每10分钟会自动运行。