🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# 教程 本教程将向您介绍一些基本的Airflow概念,对象及其在编写第一个管道时的用法。 ## 示例管道定义 以下是基本管道定义的示例。 如果这看起来很复杂,请不要担心,下面将逐行说明。 ``` """ Code that goes along with the Airflow tutorial located at: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime , timedelta default_args = { 'owner' : 'airflow' , 'depends_on_past' : False , 'start_date' : datetime ( 2015 , 6 , 1 ), 'email' : [ 'airflow@example.com' ], 'email_on_failure' : False , 'email_on_retry' : False , 'retries' : 1 , 'retry_delay' : timedelta ( minutes = 5 ), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG ( 'tutorial' , default_args = default_args ) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator ( task_id = 'print_date' , bash_command = 'date' , dag = dag ) t2 = BashOperator ( task_id = 'sleep' , bash_command = 'sleep 5' , retries = 3 , dag = dag ) templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator ( task_id = 'templated' , bash_command = templated_command , params = { 'my_param' : 'Parameter I passed in' }, dag = dag ) t2 . set_upstream ( t1 ) t3 . set_upstream ( t1 ) ``` ## 这是一个DAG定义文件 包围你的一件事(对于每个人来说可能不是很直观)是这个Airflow Python脚本实际上只是一个配置文件,将DAG的结构指定为代码。 此处定义的实际任务将在与此脚本的上下文不同的上下文中运行。 不同的任务在不同的时间点运行在不同的工作者上,这意味着该脚本不能用于在任务之间交叉通信。 请注意,为此,我们有一个名为`XCom`的更高级功能。 人们有时会将DAG定义文件视为可以进行实际数据处理的地方 - 事实并非如此! 该脚本的目的是定义DAG对象。 它需要快速评估(秒,而不是几分钟),因为调度程序将定期执行它以反映更改(如果有的话)。 ## 导入模块 Airflow管道只是一个Python脚本,恰好定义了Airflow DAG对象。 让我们首先导入我们需要的库。 ``` # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash_operator import BashOperator ``` ## 默认参数 我们即将创建一个DAG和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得多余),或者(更好!)我们可以定义一个默认参数的字典,我们可以可以在创建任务时使用。 ``` from datetime import datetime , timedelta default_args = { 'owner' : 'airflow' , 'depends_on_past' : False , 'start_date' : datetime ( 2015 , 6 , 1 ), 'email' : [ 'airflow@example.com' ], 'email_on_failure' : False , 'email_on_retry' : False , 'retries' : 1 , 'retry_delay' : timedelta ( minutes = 5 ), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } ``` 有关BaseOperator参数及其功能的更多信息,请参阅[`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")文档。 另外,请注意,您可以轻松定义可用于不同目的的不同参数集。 一个例子是在生产和开发环境之间进行不同的设置。 ## 实例化DAG 我们需要一个DAG对象来嵌入我们的任务。 这里我们传递一个定义`dag_id`的字符串,它用作DAG的唯一标识符。 我们还传递我们刚刚定义的默认参数字典,并为DAG定义1天的`schedule_interval` 。 ``` dag = DAG ( 'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 )) ``` ## 任务 在实例化操作员对象时生成任务。 从运算符实例化的对象称为构造函数。 第一个参数`task_id`充当任务的唯一标识符。 ``` t1 = BashOperator ( task_id = 'print_date' , bash_command = 'date' , dag = dag ) t2 = BashOperator ( task_id = 'sleep' , bash_command = 'sleep 5' , retries = 3 , dag = dag ) ``` 请注意我们如何将从BaseOperator继承的所有运算符( `retries` )共同的运算符特定参数( `bash_command` )和通用参数传递给运算符的构造函数。 这比为每个构造函数调用传递每个参数更简单。 另请注意,在第二个任务中,我们使用`3`覆盖`retries`参数。 任务的优先规则如下: 1. 明确传递参数 2. `default_args`字典中存在的值 3. 运算符的默认值(如果存在) 任务必须包含或继承参数`task_id`和`owner` ,否则Airflow将引发异常。 ## 与金贾一起模仿 Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的强大功能,并为管道作者提供了一组内置参数和宏。 Airflow还为管道作者提供了定义自己的参数,宏和模板的钩子。 本教程几乎没有涉及在Airflow中使用模板进行操作的表面,但本节的目的是让您知道此功能的存在,让您熟悉双花括号,并指向最常见的模板变量: `{{ ds }}` (今天的“日期戳”)。 ``` templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator ( task_id = 'templated' , bash_command = templated_command , params = { 'my_param' : 'Parameter I passed in' }, dag = dag ) ``` 请注意, `templated_command`包含`{% %}`块中的代码逻辑,引用参数如`{{ ds }}` ,调用`{{ macros.ds_add(ds, 7)}}`的函数,并在`{{ macros.ds_add(ds, 7)}}`引用用户定义的参数`{{ params.my_param }}` 。 `BaseOperator`的`params`钩子允许您将参数和/或对象的字典传递给模板。 请花点时间了解参数`my_param`如何通过模板。 文件也可以传递给`bash_command`参数,例如`bash_command='templated_command.sh'` ,其中文件位置相对于包含管道文件的目录(在本例中为`tutorial.py` )。 这可能是出于许多原因,例如分离脚本的逻辑和管道代码,允许在使用不同语言编写的文件中进行正确的代码突出显示,以及构造管道的一般灵活性。 也可以将`template_searchpath`定义为指向DAG构造函数调用中的任何文件夹位置。 使用相同的DAG构造函数调用,可以定义`user_defined_macros` ,它允许您指定自己的变量。 例如,将`dict(foo='bar')`传递给此参数允许您在模板中使用`{{ foo }}` 。 此外,指定`user_defined_filters`允许您注册自己的过滤器。 例如,将`dict(hello=lambda name: 'Hello %s' % name)`传递给此参数允许您使用`{{ 'world' | hello }}` 你的模板中的`{{ 'world' | hello }}` 有关自定义过滤器的更多信息,请查看[Jinja文档](http://jinja.pocoo.org/docs/dev/api/) 有关可以在模板中引用的变量和宏的更多信息,请务必阅读[宏](code.html)部分 ## 设置依赖关系 我们有两个不相互依赖的简单任务。 以下是一些可以定义它们之间依赖关系的方法: ``` t2 . set_upstream ( t1 ) # This means that t2 will depend on t1 # running successfully to run # It is equivalent to # t1.set_downstream(t2) t3 . set_upstream ( t1 ) # all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') ``` 请注意,在执行脚本时,Airflow会在DAG中找到循环或多次引用依赖项时引发异常。 ## 概括 好吧,所以我们有一个非常基本的DAG。 此时,您的代码应如下所示: ``` """ Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime , timedelta default_args = { 'owner' : 'airflow' , 'depends_on_past' : False , 'start_date' : datetime ( 2015 , 6 , 1 ), 'email' : [ 'airflow@example.com' ], 'email_on_failure' : False , 'email_on_retry' : False , 'retries' : 1 , 'retry_delay' : timedelta ( minutes = 5 ), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG ( 'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 )) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator ( task_id = 'print_date' , bash_command = 'date' , dag = dag ) t2 = BashOperator ( task_id = 'sleep' , bash_command = 'sleep 5' , retries = 3 , dag = dag ) templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator ( task_id = 'templated' , bash_command = templated_command , params = { 'my_param' : 'Parameter I passed in' }, dag = dag ) t2 . set_upstream ( t1 ) t3 . set_upstream ( t1 ) ``` ## 测试 ### 运行脚本 是时候进行一些测试了。 首先让我们确保管道解析。 假设我们正在保存`airflow.cfg`引用的`airflow.cfg`文件夹中`tutorial.py`中上一步的代码。 DAG的默认位置是`~/airflow/dags` 。 ``` python ~/airflow/dags/tutorial.py ``` 如果脚本没有引发异常,则意味着您没有做任何可怕的错误,并且您的Airflow环境有点健全。 ### 命令行元数据验证 让我们运行一些命令来进一步验证这个脚本。 ``` # print the list of active DAGs airflow list_dags # prints the list of tasks the "tutorial" dag_id airflow list_tasks tutorial # prints the hierarchy of tasks in the tutorial DAG airflow list_tasks tutorial --tree ``` ### 测试 让我们通过在特定日期运行实际任务实例来进行测试。 在此上下文中指定的日期是`execution_date` ,它模拟在特定日期+时间运行任务或dag的调度程序: ``` # command layout: command subcommand dag_id task_id date # testing print_date airflow test tutorial print_date 2015 -06-01 # testing sleep airflow test tutorial sleep 2015 -06-01 ``` 现在还记得我们之前用模板做过的事吗? 通过运行此命令,了解如何呈现和执行此模板: ``` # testing templated airflow test tutorial templated 2015 -06-01 ``` 这应该导致显示详细的事件日志并最终运行bash命令并打印结果。 请注意, `airflow test`命令在本地运行任务实例,将其日志输出到stdout(在屏幕上),不依赖于依赖项,并且不向数据库传达状态(运行,成功,失败,...)。 它只允许测试单个任务实例。 ### 回填 一切看起来都运行良好所以让我们运行回填。 `backfill`将尊重您的依赖关系,将日志发送到文件并与数据库通信以记录状态。 如果您有网络服务器,您将能够跟踪进度。 如果您有兴趣在回填过程中直观地跟踪进度, `airflow webserver`将启动Web服务器。 请注意,如果使用`depends_on_past=True` ,则单个任务实例将取决于前面任务实例的成功,除了指定了自身的start_date,此依赖关系将被忽略。 此上下文中的日期范围是`start_date`和可选的`end_date` ,它们用于使用此dag中的任务实例填充运行计划。 ``` # optional, start a web server in debug mode in the background # airflow webserver --debug & # start your backfill on a date range airflow backfill tutorial -s 2015 -06-01 -e 2015 -06-07 ``` ## 下一步是什么? 就是这样,你已经编写,测试并回填了你的第一个Airflow管道。 将代码合并到具有针对它运行的主调度程序的代码存储库中应该让它每天都被触发并运行。 以下是您可能想要做的一些事情: * 深入了解用户界面 - 点击所有内容! * 继续阅读文档! 特别是以下部分: > * 命令行界面 > * 运营商 > * 宏 * 写下你的第一个管道!