跳到主要内容

5、LLM生成数仓数据血缘地图

引言

本文旨在利用ChatGPT自动解读数据库DDL元数据信息以及ETL(Extract, Transform, Load)任务代码,以实现在图数据库Neo4j中准确生成数据血缘地图。主要目的在于评估ChatGPT在数据仓库领域元数据、ETL任务理解和推理方面的能力。

这一创新将显著提升数据仓库数据治理和数据质量管理的运营和维护效率,实现全面数据来源与去向的追溯,确保数据质量和合规性。血缘地图的自动生成不仅提供深度的数据影响分析,有助于性能优化,同时巩固了元数据管理,提高了问题排查效率。

什么是数据血缘地图

数据血缘分析是数据管理和数据治理过程中的关键组成部分,它有助于理解和跟踪数据在整个数据仓库中的流动和变化。数据血缘地图是通过可视化关联图谱的方式,展示数据在整个数据仓库中的流动和关联关系。它通常以图形的方式展示数据表、数据列、ETL(Extract, Transform, Load)过程、数据源之间的关系。数据血缘地图可以具体描述数据从源系统到数据仓库的完整过程,以及数据在仓库内部的转换和传输路径。

数据血缘地图有以下几个重要作用:

  1. 可视化数据流程: 数据血缘地图提供了对数据流程的清晰可见性,帮助数据团队和业务用户理解数据如何在仓库中流动和转换。
  2. 问题诊断和故障排除: 当出现数据质量问题或错误时,数据血缘地图可以帮助定位问题的源头,加速问题的解决过程。
  3. 数据文档和元数据管理: 数据血缘地图也可以用作数据文档和元数据管理的工具,帮助维护数据仓库的元数据信息,以及跟踪数据定义和数据资产的变化。
  4. 合规性和监管报告: 数据血缘地图提供了合规性和监管报告所需的可追溯性和证据,有助于企业满足监管要求。

示例说明

我们在TPC-DS系统的数据模型基础上进行了扩展,构建了涵盖数据明细层、数据服务层和数据应用层的三层模型,并编写了相应的ETL加工任务代码。这一工作旨在更好地验证ChatGPT在实际生产环境中生成数据血缘地图的效果。

数据模型

我们搭建了包括以下三个层次的数据仓库模型:

  1. DWD(Dataware Detail)数据明细层: 指的是接收数据仓库ODS层的原始数据,并进行标准化处理。我们把销售目录事实表门店销售事实表网店销售事实表合并成一张销售事实表
  2. DWS(Dataware Service)数据汇总层: 其主要作用是通过聚合和汇总,将DWD层中的数据按照主题进行汇总,进而提升数据分析性能和效率。我们以客户主题为基础,构建了多个数据聚合模型,其中包括客户每日交易行为客户活跃交易时段活跃的每周周几客户购买最多品牌和品类以及客户累计购买排名等。
  3. ADS(Application Data Store)数据应用层: 其主要存储企业上层应用直接使用的数据,这些应用包括企业决策、报表、分析等。在ADS层我们构建了客户画像表

ETL作业

以下是相关联的ETL任务清单,这些任务专用于执行与上述数据模型相关的数据处理操作。这些ETL任务的代码将与数据库信息一同提交,以供ChatGPT在构建数据血缘地图时使用。

INSERT INTO dwd_sales 
WITH ch_catalog AS (
SELECT MD5(CONCAT(cs_order_number, cs_item_sk)),
cs_order_number,
'catalog',
cs_sold_date_sk,
cs_sold_time_sk,
cs_bill_customer_sk,
cs_item_sk,
cs_quantity,
cs_net_paid_inc_ship_tax
FROM catalog_sales cs
),
ch_store AS (
SELECT MD5(CONCAT(ss_ticket_number, ss_item_sk)),
ss_ticket_number,
'store',
ss_sold_date_sk,
ss_sold_time_sk,
ss_customer_sk,
ss_item_sk,
ss_quantity,
ss_net_paid_inc_tax
FROM store_sales cs
),
ch_web AS (
SELECT MD5(CONCAT(ws_order_number, ws_item_sk)),
ws_order_number,
'web',
ws_sold_date_sk,
ws_sold_time_sk,
ws_bill_customer_sk,
ws_item_sk,
ws_quantity,
ws_net_paid_inc_ship_tax
FROM web_sales cs
)
SELECT * FROM ch_catalog
UNION ALL
SELECT * FROM ch_store
UNION ALL
SELECT * FROM ch_web;



INSERT INTO dws_cust_top_brand
WITH TMP0 AS (
SELECT ds.customer_sk,
i.i_brand ,
COUNT(DISTINCT order_number) AS order_cnt,
ROW_NUMBER() OVER (PARTITION BY customer_sk ORDER BY COUNT(DISTINCT order_number) DESC) AS order_cnt_rank
FROM dwd_sales ds
LEFT JOIN item i ON ds.item_sk = i.i_item_sk
WHERE i.i_brand IS NOT NULL
GROUP BY ds.customer_sk, i.i_brand
)
SELECT customer_sk ,
GROUP_CONCAT(i_brand ORDER BY i_brand SEPARATOR ',') ,
SUM(order_cnt)
FROM TMP0
WHERE order_cnt_rank <= 5
GROUP BY customer_sk;




INSERT INTO dws_cust_top_weekday
WITH TMP0 AS (
SELECT ds.customer_sk,
dd.d_day_name,
COUNT(DISTINCT order_number) AS order_cnt,
ROW_NUMBER() OVER (PARTITION BY customer_sk ORDER BY COUNT(DISTINCT order_number) DESC) AS order_cnt_rank
FROM dwd_sales ds
LEFT JOIN date_dim dd ON ds.sold_date_sk = dd.d_date_sk
WHERE dd.d_day_name IS NOT NULL
GROUP BY ds.customer_sk, dd.d_day_name
)
SELECT customer_sk ,
d_day_name as weekday,
order_cnt
FROM TMP0
WHERE order_cnt_rank = 1 AND d_day_name IS NOT NULL;





INSERT INTO dws_cust_sales_daily
SELECT customer_sk,
dd.d_date,
COUNT(DISTINCT order_number) AS order_count,
SUM(quantity) AS quantity,
SUM(net_paid) AS net_paid,
SUM(CASE WHEN channel = 'catalog' THEN net_paid ELSE 0 END) AS catalog_net_paid ,
SUM(CASE WHEN channel = 'store' THEN net_paid ELSE 0 END) AS store_net_paid ,
SUM(CASE WHEN channel = 'web' THEN net_paid ELSE 0 END) AS web_net_paid,
SUM(CASE WHEN dd.d_weekend = 'Y' THEN net_paid ELSE 0 END) AS weekend_net_paid,
SUM(CASE WHEN dd.d_weekend = 'N' THEN net_paid ELSE 0 END) AS working_day_net_paid,
SUM(CASE WHEN dd.d_holiday = 'Y' THEN net_paid ELSE 0 END) AS holiday_net_paid
FROM dwd_sales ds
LEFT JOIN date_dim dd ON ds.sold_date_sk = dd.d_date_sk
WHERE dd.d_date IS NOT NULL
GROUP BY customer_sk, dd.d_date;



INSERT INTO dws_cust_top_category
WITH TMP0 AS (
SELECT ds.customer_sk,
i.i_category ,
COUNT(DISTINCT order_number) AS order_cnt,
ROW_NUMBER() OVER (PARTITION BY customer_sk ORDER BY COUNT(DISTINCT order_number) DESC) AS order_cnt_rank
FROM dwd_sales ds
LEFT JOIN item i ON ds.item_sk = i.i_item_sk
WHERE i.i_category IS NOT NULL
GROUP BY ds.customer_sk, i.i_category
)
SELECT customer_sk ,
GROUP_CONCAT(i_category ORDER BY i_category SEPARATOR ',') ,
SUM(order_cnt)
FROM TMP0
WHERE order_cnt_rank <= 5
GROUP BY customer_sk;




INSERT INTO dws_cust_top_time_shift
WITH TMP0 AS (
SELECT ds.customer_sk,
td.t_sub_shift,
COUNT(DISTINCT order_number) AS order_cnt,
ROW_NUMBER() OVER (PARTITION BY customer_sk ORDER BY COUNT(DISTINCT order_number) DESC) AS order_cnt_rank
FROM dwd_sales ds
LEFT JOIN time_dim td ON ds.sold_time_sk = td.t_time_sk
GROUP BY ds.customer_sk, td.t_sub_shift
)
SELECT customer_sk,
t_sub_shift AS top_shift,
order_cnt
FROM TMP0
WHERE order_cnt_rank = 1 ;



INSERT INTO dws_cust_sales_rank
WITH TMP0 AS (
SELECT ds.customer_sk,
COUNT(DISTINCT order_number) AS order_cnt,
SUM(net_paid) AS order_amt,
ROW_NUMBER() OVER (ORDER BY COUNT(DISTINCT order_number) DESC) AS order_cnt_rank,
ROW_NUMBER() OVER (ORDER BY SUM(net_paid) DESC) AS order_amt_rank
FROM dwd_sales ds
LEFT JOIN customer c ON ds.customer_sk = c.c_customer_sk
GROUP BY customer_sk
)
SELECT customer_sk,
order_cnt,
order_cnt_rank,
order_cnt_rank * 100/COUNT(1) OVER() AS order_cnt_rank_ratio,
order_amt_rank,
order_amt_rank * 100/COUNT(1) OVER() AS order_amt_rank_ratio
FROM TMP0;



INSERT INTO ads_cust_portrait
WITH TMP0 AS (
SELECT dcsd.customer_sk ,
SUM(order_count) tot_order_cnt,
SUM(net_paid) tot_net_paid,
MIN(biz_date) AS first_order_date,
MAX(biz_date) AS last_order_date,
SUM(store_net_paid) AS store_net_paid ,
SUM(catalog_net_paid) AS catalog_net_paid ,
SUM(web_net_paid) AS web_net_paid,
SUM(weekend_net_paid) AS weekend_net_paid,
SUM(working_day_net_paid) AS working_day_net_paid,
SUM(holiday_net_paid) AS holiday_net_paid
FROM dws_cust_sales_daily dcsd
GROUP BY customer_sk
)
SELECT TMP0.*,
dcsr.order_cnt_rank,
dcsr.order_cnt_rank_ratio,
dcsr.order_amt_rank,
dcsr.order_amt_rank_ratio,
dctw.weekday,
dctw.order_cnt,
dctts.top_shift,
dctts.order_cnt,
dctb.top_brand,
dctc.top_category
FROM TMP0
LEFT JOIN dws_cust_sales_rank dcsr ON TMP0.customer_sk = dcsr.customer_sk
LEFT JOIN dws_cust_top_weekday dctw ON TMP0.customer_sk = dctw.customer_sk
LEFT JOIN dws_cust_top_time_shift dctts ON TMP0.customer_sk = dctts.customer_sk
LEFT JOIN dws_cust_top_brand dctb ON TMP0.customer_sk = dctb.customer_sk
LEFT JOIN dws_cust_top_category dctc ON TMP0.customer_sk = dctc.customer_sk;

数据血缘图谱模型

Neo4j是一款图数据库,在Neo4j中存储数据血缘可轻松表示数据之间的关系,支持复杂的查询和深度分析。这为数据仓库专家提供了更直观、高效的方式来理解和管理数据流程,促进了数据血缘的追溯、问题排查和数据治理。

数据血缘地图以实体(Entities)和边关系(Edges)为基础,用于准确描述整个数据仓库中数据的流动、变换和关联关系。实体对象包括数据表、数据列、ETL作业、数据源等,边关系则包括数据提取、数据加载、映射等。

在本文中,我们专注于验证ChatGPT在基于复杂ETL作业和数据仓库层次结构构建数据血缘的能力。为简化关系模型,仅保留两个主要实体:

  1. 数据表(TABLE): 数据库表实体代表数据库中的对象。
  2. ETL作业(ETL_JOB): ETL作业实体代表ETL的加工任务。

此外,我们考虑了以下两种相关关系:

  1. 数据提取(EXTRACT): 数据提取关系指的是连接数据库源表和ETL作业的关系。
  2. 数据加载(LOAD): 数据加载关系表示连接ETL作业和数据库目标表的关系。

最终形成的血缘关系如下:

(源表:TABLE)->[抽取:EXTRACT]->(ETL作业:ETL_JOB)->[装载:LOAD]->(目标表:TABLE)

代码实现

生成实体(Entity)

以下是通过数据库元数据信息以及ETL作业生成的实体的代码,这段代码包含三部分内容:

  1. 创建SQLDatabase对象
    此步骤获取数据库的元数据信息,包括数据定义语言(DDL)和表示例数据,以便大模型能够理了解数据库结构和内容。
  2. 读取ETL作业脚本:
    遍历etl目录下的所有ETL作业的SQL脚本,用于LLM的输入信息。
  3. 实体生成提示词
    把以上获取的信息组装成提示词,为大模型提供必要的信息,以便大模型更好的执行任务。这些任务包括:
    角色设定: 定义大模型的角色为Neo4j数据库技术专家,以便专门处理与Neo4j相关的任务。
    任务指令: 提出基于已有的数据库DDL和ETL作业信息,生成用于在Neo4j数据库中创建实体的语句的具体任务。
    少样本示例: 提供具体的少量样本示例,帮助理解如何从给定的输入信息中提取必要的数据,并转换成Neo4j的实体创建语句。
    输入信息: 明确指出输入信息包括数据库的所有表的DDL信息以及ETL作业的详细情况。
# 创建数据库,分别在{user}、{password}、{host}、{database}中填入对应的账户号、密码、主机以及数据库
db = SQLDatabase.from_uri("mysql+pymysql://{user}:{password}@{host}/{database}", sample_rows_in_table_info=0)

#读取ETL作业文件
with open(os.path.join('etl', 'etl_all.sql'), 'r') as file:
etl = file.read()

entity_prompt = f"""
###指令###
你是一个Neo4j专家,请基于数据仓库信息以及ETL作业生成数Neo4j数据库的Cypher创建实体(ENTITY)语句,
输出的Cypher语句必须语法正确能够正常运行。
严格按照给出的示例格式生成Cypher,不允许创造实体和关系。

###示例###
###表实体生成示例###
<输入>CREATE TABLE tableName (col1,col2,col3,col4..)
<输出>CREATE (:TABLE {{name:"tableName"}});

###ETLJOB实体生成示例###
<输入>INSERT INTO destTable SELECT col1, col2 .. FROM ... LEFT JOIN ...
<输出>CREATE (:ETLJOB {{name: "etl_destTable"}});

<输入>INSERT INTO destTable WITH ... SELECT col1, col2 .. FROM ... LEFT JOIN ...
<输出>CREATE (:ETLJOB {{name: "etl_destTable"}});

###输入###
以下是数据库表信息:
{db.table_info}

以下是ETL作业:
{etl}

###输出表实体###
//生成表实体
"""

entity_result = llm.invoke(entity_prompt)
print(entity_result.content)

以下是通过利用ChatGPT生成的实体(Entity)创建语句:

CREATE (:TABLE {name:"ads_cust_portrait"});
CREATE (:TABLE {name:"call_center"});
CREATE (:TABLE {name:"catalog_page"});
CREATE (:TABLE {name:"catalog_returns"});
CREATE (:TABLE {name:"catalog_sales"});
CREATE (:TABLE {name:"customer"});
CREATE (:TABLE {name:"customer_address"});
CREATE (:TABLE {name:"customer_demographics"});
CREATE (:TABLE {name:"date_dim"});
CREATE (:TABLE {name:"dwd_sales"});
CREATE (:TABLE {name:"dws_cust_sales_daily"});
CREATE (:TABLE {name:"dws_cust_sales_rank"});
CREATE (:TABLE {name:"dws_cust_top_brand"});
CREATE (:TABLE {name:"dws_cust_top_category"});
CREATE (:TABLE {name:"dws_cust_top_time_shift"});
CREATE (:TABLE {name:"dws_cust_top_weekday"});
CREATE (:TABLE {name:"household_demographics"});
CREATE (:TABLE {name:"income_band"});
CREATE (:TABLE {name:"inventory"});
CREATE (:TABLE {name:"item"});
CREATE (:TABLE {name:"promotion"});
CREATE (:TABLE {name:"reason"});
CREATE (:TABLE {name:"store"});
CREATE (:TABLE {name:"store_returns"});
CREATE (:TABLE {name:"store_sales"});
CREATE (:TABLE {name:"time_dim"});
CREATE (:TABLE {name:"warehouse"});
CREATE (:TABLE {name:"web_page"});
CREATE (:TABLE {name:"web_returns"});
CREATE (:TABLE {name:"web_sales"});
CREATE (:TABLE {name:"web_site"});

CREATE (:ETLJOB {name: "etl_dwd_sales"});
CREATE (:ETLJOB {name: "etl_dws_cust_top_brand"});
CREATE (:ETLJOB {name: "etl_dws_cust_top_weekday"});
CREATE (:ETLJOB {name: "etl_dws_cust_sales_daily"});
CREATE (:ETLJOB {name: "etl_dws_cust_top_category"});
CREATE (:ETLJOB {name: "etl_dws_cust_top_time_shift"});
CREATE (:ETLJOB {name: "etl_dws_cust_sales_rank"});
CREATE (:ETLJOB {name: "etl_ads_cust_portrait"});

生成关系(Edge)

以下内容展示了利用数据库元数据和ETL作业代码生成关系的代码。这里我们只关注提示词,因为元数据和ETL代码已经在之前的步骤中获取。这些提示主要旨在指导ChatGPT如何从各种SQL语句中提取关系,尤其是那些包含连接(JOIN)子句和WITH子句的SQL语句。为了准确提取关系,需要提供明确的指导提示。

edge_prompt = f"""
###指令###
你是一个Neo4j专家,请基于ETL作业生成数Neo4j数据库的Cypher创建关系语句, 输出的Cypher语句必须语法正确能够正常运行。严格按照给出的示例格式生成Cypher。
一个INSERT INTO语句生成一条创建关系的Cypher语句。
一个INSERT INTO语句生成的关系只有一个LOAD关系,至少有一个EXTRACT关系。
ETLJOB的name属性必须是:etl_前缀+INSERT INTO表名。
重复检查确保MATCH子句中的TABLE实体必须存在于INSERT INTO语句中,不能出现不存在的表。
重复检查确保MATCH子句中的TABLE实体表不能遗漏。
未匹配成功输出信息。

###示例###
<输入>INSERT INTO destTable SELECT col1, col2, col3, col4 .. FROM srcTable1 LEFT JOIN srcTable2...;
<输出>MATCH (srcTable1:TABLE {{name: "srcTable1"}}),
(srcTable2:TABLE {{name: "srcTable2"}}),
(etl_destTable:ETLJOB {{name: "etl_destTable"}}),
(destTable:TABLE {{name: "destTable"}})
CREATE (srcTable1)-[:EXTRACT]->(etl_destTable),
(srcTable2)-[:EXTRACT]->(etl_destTable),
(etl_destTable)-[:LOAD]->(destTable);

<输入>INSERT INTO destTable
WITH TMP0 AS ( SELECT ... FROM srcTable1),
TMP1 AS ( SELECT ... FROM srcTable2 )
SELECT ... FROM TMP0
UNION ALL
SELECT ... FROM TMP1;
<输出>MATCH (srcTable1:TABLE {{name: "srcTable1"}}),
(srcTable2:TABLE {{name: "srcTable2"}}),
(etl_destTable:ETLJOB {{name: "etl_destTable"}}),
(destTable:TABLE {{name: "destTable"}})
CREATE (srcTable1)-[:EXTRACT]->(etl_destTable),
(srcTable2)-[:EXTRACT]->(etl_destTable),
(etl_destTable)-[:LOAD]->(destTable);

<输入>INSERT INTO destTable
WITH TMP0 AS ( SELECT ... FROM srcTable1),
TMP1 AS ( SELECT ... FROM srcTable2 )
SELECT ... FROM TMP0, TMP1;
<输出>MATCH (srcTable1:TABLE {{name: "srcTable1"}}),
(srcTable2:TABLE {{name: "srcTable2"}}),
(etl_destTable:ETLJOB {{name: "etl_destTable"}}),
(destTable:TABLE {{name: "destTable"}})
CREATE (srcTable1)-[:EXTRACT]->(etl_destTable),
(srcTable2)-[:EXTRACT]->(etl_destTable),
(etl_destTable)-[:LOAD]->(destTable);

<输入>INSERT INTO destTable
SELECT ... FROM (SELECT ... FROM srcTable1 GROUP BY ...) AS TMP0
LEFT JOIN srcTable2 ...
LEFT JOIN srcTable3 ...
LEFT JOIN srcTable4 ...;
<输出>MATCH (srcTable1:TABLE {{name: "srcTable1"}}),
(srcTable2:TABLE {{name: "srcTable2"}}),
(srcTable3:TABLE {{name: "srcTable3"}}),
(srcTable4:TABLE {{name: "srcTable4"}}),
(etl_destTable:ETLJOB {{name: "etl_destTable"}}),
(destTable:TABLE {{name: "destTable"}})
CREATE (srcTable1)-[:EXTRACT]->(etl_destTable),
(srcTable2)-[:EXTRACT]->(etl_destTable),
(srcTable3)-[:EXTRACT]->(etl_destTable),
(srcTable4)-[:EXTRACT]->(etl_destTable),
(etl_destTable)-[:LOAD]->(destTable);

###输入###
以下是ETL作业:
{etl}

###输出关系###
//生成关系
"""

edge_result = llm.invoke(edge_prompt)
print(edge_result.content)

以下是通过利用ChatGPT生成的关系(Edge)创建语句:

MATCH (catalog_sales:TABLE {name: "catalog_sales"}), 
(store_sales:TABLE {name: "store_sales"}),
(web_sales:TABLE {name: "web_sales"}),
(etl_dwd_sales:ETLJOB {name: "etl_dwd_sales"}),
(dwd_sales:TABLE {name: "dwd_sales"})
CREATE (catalog_sales)-[:EXTRACT]->(etl_dwd_sales),
(store_sales)-[:EXTRACT]->(etl_dwd_sales),
(web_sales)-[:EXTRACT]->(etl_dwd_sales),
(etl_dwd_sales)-[:LOAD]->(dwd_sales);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(item:TABLE {name: "item"}),
(etl_dws_cust_top_brand:ETLJOB {name: "etl_dws_cust_top_brand"}),
(dws_cust_top_brand:TABLE {name: "dws_cust_top_brand"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_top_brand),
(item)-[:EXTRACT]->(etl_dws_cust_top_brand),
(etl_dws_cust_top_brand)-[:LOAD]->(dws_cust_top_brand);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(date_dim:TABLE {name: "date_dim"}),
(etl_dws_cust_top_weekday:ETLJOB {name: "etl_dws_cust_top_weekday"}),
(dws_cust_top_weekday:TABLE {name: "dws_cust_top_weekday"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_top_weekday),
(date_dim)-[:EXTRACT]->(etl_dws_cust_top_weekday),
(etl_dws_cust_top_weekday)-[:LOAD]->(dws_cust_top_weekday);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(date_dim:TABLE {name: "date_dim"}),
(etl_dws_cust_sales_daily:ETLJOB {name: "etl_dws_cust_sales_daily"}),
(dws_cust_sales_daily:TABLE {name: "dws_cust_sales_daily"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_sales_daily),
(date_dim)-[:EXTRACT]->(etl_dws_cust_sales_daily),
(etl_dws_cust_sales_daily)-[:LOAD]->(dws_cust_sales_daily);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(item:TABLE {name: "item"}),
(etl_dws_cust_top_category:ETLJOB {name: "etl_dws_cust_top_category"}),
(dws_cust_top_category:TABLE {name: "dws_cust_top_category"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_top_category),
(item)-[:EXTRACT]->(etl_dws_cust_top_category),
(etl_dws_cust_top_category)-[:LOAD]->(dws_cust_top_category);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(time_dim:TABLE {name: "time_dim"}),
(etl_dws_cust_top_time_shift:ETLJOB {name: "etl_dws_cust_top_time_shift"}),
(dws_cust_top_time_shift:TABLE {name: "dws_cust_top_time_shift"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_top_time_shift),
(time_dim)-[:EXTRACT]->(etl_dws_cust_top_time_shift),
(etl_dws_cust_top_time_shift)-[:LOAD]->(dws_cust_top_time_shift);

MATCH (dwd_sales:TABLE {name: "dwd_sales"}),
(customer:TABLE {name: "customer"}),
(etl_dws_cust_sales_rank:ETLJOB {name: "etl_dws_cust_sales_rank"}),
(dws_cust_sales_rank:TABLE {name: "dws_cust_sales_rank"})
CREATE (dwd_sales)-[:EXTRACT]->(etl_dws_cust_sales_rank),
(customer)-[:EXTRACT]->(etl_dws_cust_sales_rank),
(etl_dws_cust_sales_rank)-[:LOAD]->(dws_cust_sales_rank);

MATCH (dws_cust_sales_daily:TABLE {name: "dws_cust_sales_daily"}),
(dws_cust_sales_rank:TABLE {name: "dws_cust_sales_rank"}),
(dws_cust_top_weekday:TABLE {name: "dws_cust_top_weekday"}),
(dws_cust_top_time_shift:TABLE {name: "dws_cust_top_time_shift"}),
(dws_cust_top_brand:TABLE {name: "dws_cust_top_brand"}),
(dws_cust_top_category:TABLE {name: "dws_cust_top_category"}),
(etl_ads_cust_portrait:ETLJOB {name: "etl_ads_cust_portrait"}),
(ads_cust_portrait:TABLE {name: "ads_cust_portrait"})
CREATE (dws_cust_sales_daily)-[:EXTRACT]->(etl_ads_cust_portrait),
(dws_cust_sales_rank)-[:EXTRACT]->(etl_ads_cust_portrait),
(dws_cust_top_weekday)-[:EXTRACT]->(etl_ads_cust_portrait),
(dws_cust_top_time_shift)-[:EXTRACT]->(etl_ads_cust_portrait),
(dws_cust_top_brand)-[:EXTRACT]->(etl_ads_cust_portrait),
(dws_cust_top_category)-[:EXTRACT]->(etl_ads_cust_portrait),
(etl_ads_cust_portrait)-[:LOAD]->(ads_cust_portrait);

执行结果

本图展示的是在Neo4j数据库中,基于生成的实体(Entities)与关联(Edges)语句构建的关系图谱的成果。图谱中,紫色节点代表数据库表,而蓝色节点则表示ETL作业。通过此图,我们能够直观地观察到数据仓库中各表间的血缘关系,揭示了数据流转及处理逻辑的内在联系。