企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Lineage 注意 Lineage 支持是非常实验性的,可能会发生变化。 Airflow可以帮助跟踪数据的来源,发生的事情以及数据随时间的变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。 气流通过任务的入口和出口跟踪数据。 让我们从一个例子开始,看看它是如何工作的。 ``` from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.lineage.datasets import File from airflow.models import DAG from datetime import timedelta FILE_CATEGORIES = [ "CAT1" , "CAT2" , "CAT3" ] args = { 'owner' : 'airflow' , 'start_date' : airflow . utils . dates . days_ago ( 2 ) } dag = DAG ( dag_id = 'example_lineage' , default_args = args , schedule_interval = '0 0 * * *' , dagrun_timeout = timedelta ( minutes = 60 )) f_final = File ( "/tmp/final" ) run_this_last = DummyOperator ( task_id = 'run_this_last' , dag = dag , inlets = { "auto" : True }, outlets = { "datasets" : [ f_final ,]}) f_in = File ( "/tmp/whole_directory/" ) outlets = [] for file in FILE_CATEGORIES : f_out = File ( "/tmp/ {} /{{{{ execution_date }}}}" . format ( file )) outlets . append ( f_out ) run_this = BashOperator ( task_id = 'run_me_first' , bash_command = 'echo 1' , dag = dag , inlets = { "datasets" : [ f_in ,]}, outlets = { "datasets" : outlets } ) run_this . set_downstream ( run_this_last ) ``` 任务采用参数<cite>入口</cite>和<cite>出口</cite> 。 入口可以由数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite>手动定义,也可以配置为从上游任务中查找出口<cite>{“task_ids”:[“task_id1”,“task_id2”]}</cite>或者可以配置为从直接上游任务<cite>{“auto”:True}</cite>或它们的组合中获取出口。 出口被定义为数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite> 。 在执行任务时,数据集的任何字段都使用上下文进行模板化。 注意 如果操作员支持,操作员可以自动添加入口和出口。 在示例DAG任务中, <cite>run_me_first</cite>是一个BashOperator,它接收从列表生成的3个入口: <cite>CAT1</cite> , <cite>CAT2</cite> , <cite>CAT3</cite> 。 请注意, <cite>execution_date</cite>是一个模板化字段,将在任务运行时呈现。 注意 在幕后,Airflow将沿袭元数据作为任务的<cite>pre_execute</cite>方法的一部分进行准备。 当任务完成执行<cite>时,</cite>将调用<cite>post_execute</cite>并将lineage元数据推送到XCOM中。 因此,如果您要创建自己的覆盖此方法的运算符,请确保分别使用<cite>prepare_lineage</cite>和<cite>apply_lineage</cite>修饰您的方法。 ## Apache Atlas Airflow可以将其沿袭元数据发送到Apache Atlas。 您需要启用<cite>atlas</cite>后端并正确配置它,例如在<cite>airflow.cfg中</cite> : ``` [ lineage ] backend = airflow . lineage . backend . atlas [ atlas ] username = my_username password = my_password host = host port = 21000 ``` 请确保安装了<cite>atlasclient</cite>软件包。