ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
# 概念 Airflow Platform是用于描述,执行和监控工作流的工具。 ## 核心理念 ### DAG的 在Airflow中, `DAG` (或定向非循环图)是您要运行的所有任务的集合,以反映其关系和依赖关系的方式进行组织。 例如,一个简单的DAG可以包含三个任务:A,B和C.可以说A必须在B可以运行之前成功运行,但C可以随时运行。 它可以说任务A在5分钟后超时,并且B可以重新启动最多5次以防它失败。 它也可能会说工作流程将在每天晚上10点运行,但不应该在某个特定日期之前开始。 通过这种方式,DAG描述_了_您希望如何执行工作流程; 但请注意,我们还没有说过我们真正想做的事情! A,B和C可以是任何东西。 当C发送电子邮件时,也许A准备B进行分析的数据。 或者也许A监控你的位置,这样B可以打开你的车库门,而C打开你的房子灯。 重要的是,发展议程集团并不关心其组成任务的作用; 它的工作是确保无论他们做什么在正确的时间,或正确的顺序,或正确处理任何意外的问题。 DAG在标准Python文件中定义,这些文件放在Airflow的`DAG_FOLDER` 。 Airflow将执行每个文件中的代码以动态构建`DAG`对象。 您可以拥有任意数量的DAG,每个DAG都描述任意数量的任务。 通常,每个应该对应于单个逻辑工作流。 注意 搜索DAG时,Airflow将仅考虑字符串“airflow”和“DAG”都出现在`.py`文件内容中的文件。 #### 范围 Airflow将加载它可以从`DAG`导入的任何`DAG`对象。 重要的是,这意味着DAG必须出现在`globals()` 。 考虑以下两个DAG。 只会加载`dag_1` ; 另一个只出现在本地范围内。 ``` dag_1 = DAG ( 'this_dag_will_be_discovered' ) def my_function (): dag_2 = DAG ( 'but_this_dag_will_not' ) my_function () ``` 有时这可以很好地利用。 例如, `SubDagOperator`的常见模式是定义函数内的子标记,以便Airflow不会尝试将其作为独立的DAG加载。 #### 默认参数 如果将`default_args`字典传递给DAG,它将把它们应用于任何运算符。 这使得很容易将公共参数应用于许多运算符而无需多次键入。 ``` default_args = { 'start_date' : datetime ( 2016 , 1 , 1 ), 'owner' : 'Airflow' } dag = DAG ( 'my_dag' , default_args = default_args ) op = DummyOperator ( task_id = 'dummy' , dag = dag ) print ( op . owner ) # Airflow ``` #### Context Manager _在Airflow 1.8中添加_ DAG可用作上下文管理器,以自动将新运算符分配给该DAG。 ``` with DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 )) as dag : op = DummyOperator ( 'op' ) op . dag is dag # True ``` ### 运营商 虽然DAG描述了_如何_运行工作流,但`Operators`确定实际完成的工作。 操作员描述工作流中的单个任务。 运营商通常(但并非总是)是原子的,这意味着他们可以独立运营,而不需要与任何其他运营商共享资源。 DAG将确保运营商以正确的顺序运行; 除了这些依赖项之外,运营商通常独立运行。 实际上,它们可能在两台完全不同的机器上运行。 这是一个微妙但非常重要的一点:通常,如果两个运营商需要共享信息,如文件名或少量数据,您应该考虑将它们组合到一个运算符中。 如果绝对无法避免,Airflow确实具有操作员交叉通信的功能,称为XCom,本文档的其他部分对此进行了描述。 Airflow为许多常见任务提供操作员,包括: * `BashOperator` - 执行bash命令 * `PythonOperator` - 调用任意Python函数 * `EmailOperator` - 发送电子邮件 * `SimpleHttpOperator` - 发送HTTP请求 * `MySqlOperator` , `SqliteOperator` , `PostgresOperator` , `MsSqlOperator` , `OracleOperator` , `JdbcOperator`等 - 执行SQL命令 * `Sensor` - 等待一定时间,文件,数据库行,S3键等... 除了这些基本构建块之外,还有许多特定的运算符: `DockerOperator` , `HiveOperator` , `S3FileTransformOperator` , `PrestoToMysqlOperator` , `SlackOperator` ......你明白了! `airflow/contrib/`目录包含更多由社区构建的运算符。 这些运算符并不总是像主发行版中那样完整或经过良好测试,但允许用户更轻松地向平台添加新功能。 如果将操作员分配给DAG,则操作员仅由Airflow加载。 请参阅[使用运算符](howto/operator.html)了解如何使用Airflow运算符。 #### DAG分配 _在Airflow 1.8中添加_ 操作员不必立即分配给DAG(之前的`dag`是必需参数)。 但是,一旦将运营商分配给DAG,就无法转移或取消分配。 在创建运算符时,通过延迟赋值或甚至从其他运算符推断,可以显式地完成DAG分配。 ``` dag = DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 )) # sets the DAG explicitly explicit_op = DummyOperator ( task_id = 'op1' , dag = dag ) # deferred DAG assignment deferred_op = DummyOperator ( task_id = 'op2' ) deferred_op . dag = dag # inferred DAG assignment (linked operators must be in the same DAG) inferred_op = DummyOperator ( task_id = 'op3' ) inferred_op . set_upstream ( deferred_op ) ``` #### Bitshift成分 _在Airflow 1.8中添加_ 传统上,使用`set_upstream()`和`set_downstream()`方法设置运算符关系。 在Airflow 1.8中,这可以通过Python bitshift操作符`&gt;&gt;`和`&lt;&lt;`来完成。 以下四个语句在功能上都是等效的: ``` op1 >> op2 op1 . set_downstream ( op2 ) op2 << op1 op2 . set_upstream ( op1 ) ``` 当使用bitshift组合运算符时,关系设置在bitshift运算符指向的方向上。 例如, `op1 &gt;&gt; op2`表示`op1`先运行, `op2`运行第二。 可以组成多个运算符 - 请记住,链从左到右执行,并且始终返回最右边的对象。 例如: ``` op1 >> op2 >> op3 << op4 ``` 相当于: ``` op1 . set_downstream ( op2 ) op2 . set_downstream ( op3 ) op3 . set_upstream ( op4 ) ``` 为方便起见,bitshift运算符也可以与DAG一起使用。 例如: ``` dag >> op1 >> op2 ``` 相当于: ``` op1 . dag = dag op1 . set_downstream ( op2 ) ``` 我们可以把这一切放在一起构建一个简单的管道: ``` with DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 )) as dag : ( DummyOperator ( task_id = 'dummy_1' ) >> BashOperator ( task_id = 'bash_1' , bash_command = 'echo "HELLO!"' ) >> PythonOperator ( task_id = 'python_1' , python_callable = lambda : print ( "GOODBYE!" )) ) ``` ### 任务 一旦运算符被实例化,它就被称为“任务”。 实例化在调用抽象运算符时定义特定值,参数化任务成为DAG中的节点。 ### 任务实例 任务实例表示任务的特定运行,其特征在于dag,任务和时间点的组合。 任务实例也有一个指示状态,可以是“运行”,“成功”,“失败”,“跳过”,“重试”等。 ### 工作流程 您现在熟悉Airflow的核心构建模块。 有些概念可能听起来非常相似,但词汇表可以概念化如下: * DAG:描述工作应该发生的顺序 * 运算符:作为执行某些工作的模板的类 * 任务:运算符的参数化实例 * 任务实例:1)已分配给DAG的任务,2)具有与DAG的特定运行相关联的状态 通过组合`DAGs`和`Operators`来创建`TaskInstances` ,您可以构建复杂的工作流。 ## 附加功能 除了核心Airflow对象之外,还有许多更复杂的功能可以实现限制同时访问资源,交叉通信,条件执行等行为。 ### 钩 钩子是外部平台和数据库的接口,如Hive,S3,MySQL,Postgres,HDFS和Pig。 Hooks尽可能实现通用接口,并充当运营商的构建块。 他们还使用`airflow.models.Connection`模型来检索主机名和身份验证信息。 挂钩将身份验证代码和信息保存在管道之外,集中在元数据数据库中。 钩子在Python脚本,Airflow airflow.operators.PythonOperator以及iPython或Jupyter Notebook等交互式环境中使用它们也非常有用。 ### 池 当有太多进程同时攻击它们时,某些系统可能会被淹没。 气流池可用于**限制**任意任务集上**的执行并行性** 。 通过为池命名并为其分配多个工作槽来在UI( `Menu -&gt; Admin -&gt; Pools` )中管理池列表。 然后,在创建任务时(即,实例化运算符),可以使用`pool`参数将任务与其中一个现有池相关联。 ``` aggregate_db_message_job = BashOperator ( task_id = 'aggregate_db_message_job' , execution_timeout = timedelta ( hours = 3 ), pool = 'ep_data_pipeline_db_msg_agg' , bash_command = aggregate_db_message_job_cmd , dag = dag ) aggregate_db_message_job . set_upstream ( wait_for_empty_queue ) ``` `pool`参数可以与`priority_weight`结合使用,以定义队列中的优先级,以及在池中打开的槽时首先执行哪些任务。 默认的`priority_weight`是`1` ,可以碰到任何数字。 在对队列进行排序以评估接下来应该执行哪个任务时,我们使用`priority_weight` ,与来自此任务下游任务的所有`priority_weight`值相加。 您可以使用它来执行特定的重要任务,并相应地优先处理该任务的整个路径。 当插槽填满时,任务将照常安排。 达到容量后,可运行的任务将排队,其状态将在UI中显示。 当插槽空闲时,排队的任务将根据`priority_weight` (任务及其后代)开始运行。 请注意,默认情况下,任务不会分配给任何池,并且它们的执行并行性仅限于执行程序的设置。 ### 连接 外部系统的连接信息存储在Airflow元数据数据库中并在UI中进行管理( `Menu -&gt; Admin -&gt; Connections` )在那里定义了`conn_id` ,并附加了主机名/登录/密码/架构信息。 气流管道可以简单地引用集中管理的`conn_id`而无需在任何地方硬编码任何此类信息。 可以定义具有相同`conn_id`许多连接,并且在这种情况下,并且当**挂钩**使用来自`BaseHook`的`get_connection`方法时,Airflow将随机选择一个连接,允许在与重试一起使用时进行一些基本的负载平衡和容错。 Airflow还能够通过操作系统中的环境变量引用连接。 但它只支持URI格式。 如果您需要为连接指定`extra` ,请使用Web UI。 如果在Airflow元数据数据库和环境变量中都定义了具有相同`conn_id`连接,则Airflow将仅引用环境变量中的连接(例如,给定`conn_id` `postgres_master` ,Airflow将`AIRFLOW_CONN_POSTGRES_MASTER`在环境变量中搜索`AIRFLOW_CONN_POSTGRES_MASTER`并直接引用它,如果发现,在开始搜索元数据数据库之前)。 许多挂钩都有一个默认的`conn_id` ,使用该挂钩的运算符不需要提供显式连接ID。 例如, [`PostgresHook`](code.html "airflow.hooks.postgres_hook.PostgresHook")的默认`conn_id`是`postgres_default` 。 请参阅[管理Connections](howto/manage-connections.html)以了解如何创建和管理连接。 ### 队列 使用CeleryExecutor时,可以指定发送任务的芹菜队列。 `queue`是BaseOperator的一个属性,因此任何任务都可以分配给任何队列。 环境的默认队列在`airflow.cfg`的`celery -&gt; default_queue` 。 这定义了未指定任务时分配给的队列,以及Airflow工作程序在启动时侦听的队列。 工作人员可以收听一个或多个任务队列。 当工作程序启动时(使用命令`airflow worker` ),可以指定一组逗号分隔的队列名称(例如, `airflow worker -q spark` )。 然后,该工作人员将仅接收连接到指定队列的任务。 如果您需要专业工作人员,从资源角度来看(例如,一个工作人员可以毫无问题地执行数千个任务),或者从环境角度(您希望工作人员从Spark群集中运行),这可能非常有用本身,因为它需要一个非常具体的环境和安全权利)。 ### XComs XComs允许任务交换消息,允许更细微的控制形式和共享状态。 该名称是“交叉通信”的缩写。 XComs主要由键,值和时间戳定义,但也跟踪创建XCom的任务/ DAG以及何时应该可见的属性。 任何可以被pickle的对象都可以用作XCom值,因此用户应该确保使用适当大小的对象。 可以“推”(发送)或“拉”(接收)XComs。 当任务推送XCom时,它通常可用于其他任务。 任务可以通过调用`xcom_push()`方法随时推送XComs。 此外,如果任务返回一个值(来自其Operator的`execute()`方法,或者来自PythonOperator的`python_callable`函数),则会自动推送包含该值的XCom。 任务调用`xcom_pull()`来检索XComs,可选地根据`key` ,source `task_ids`和source `dag_id`等条件应用过滤器。 默认情况下, `xcom_pull()`过滤掉从执行函数返回时被自动赋予XCom的键(与手动推送的XCom相反)。 如果为`xcom_pull`传递`xcom_pull`单个字符串,则返回该任务的最新XCom值; 如果传递了task_ids列表,则返回相应的XCom值列表。 ``` # inside a PythonOperator called 'pushing_task' def push_function (): return value # inside another PythonOperator where provide_context=True def pull_function ( ** context ): value = context [ 'task_instance' ] . xcom_pull ( task_ids = 'pushing_task' ) ``` 也可以直接在模板中提取XCom,这是一个示例: ``` SELECT * FROM {{ task_instance . xcom_pull ( task_ids = 'foo' , key = 'table_name' ) }} ``` 请注意,XCom与[变量](20)类似,但专门用于任务间通信而非全局设置。 ### 变量 变量是将任意内容或设置存储和检索为Airflow中的简单键值存储的通用方法。 可以从UI( `Admin -&gt; Variables` ),代码或CLI列出,创建,更新和删除`Admin -&gt; Variables` 。 此外,json设置文件可以通过UI批量上传。 虽然管道代码定义和大多数常量和变量应该在代码中定义并存储在源代码控制中,但是通过UI可以访问和修改某些变量或配置项会很有用。 ``` from airflow.models import Variable foo = Variable . get ( "foo" ) bar = Variable . get ( "bar" , deserialize_json = True ) ``` 第二个调用假设`json`内容,并将反序列化为`bar` 。 请注意, `Variable`是sqlalchemy模型,可以这样使用。 您可以使用jinja模板中的变量,其语法如下: ``` echo {{ var . value .< variable_name > }} ``` 或者如果需要从变量反序列化json对象: ``` echo {{ var . json .< variable_name > }} ``` ### 分枝 有时您需要一个工作流来分支,或者只根据任意条件走下某条路径,这通常与上游任务中发生的事情有关。 一种方法是使用`BranchPythonOperator` 。 `BranchPythonOperator`与PythonOperator非常相似,只是它需要一个返回task_id的python_callable。 返回返回的task_id,并跳过所有其他路径。 Python函数返回的task_id必须直接引用BranchPythonOperator任务下游的任务。 请注意,在`depends_on_past=True`中使用`depends_on_past=True`下游的任务在逻辑上是不合理的,因为`skipped`状态将总是导致依赖于过去成功的块任务。 `skipped`状态在所有直接上游任务被`skipped`地方传播。 如果你想跳过一些任务,请记住你不能有一个空路径,如果是这样,那就做一个虚拟任务。 像这样,跳过虚拟任务“branch_false” ![https://airflow.apache.org/_images/branch_good.png](https://box.kancloud.cn/d834daec97e445000e7d9e5058c84bf4_573x130.jpg) 不喜欢这样,跳过连接任务 ![https://airflow.apache.org/_images/branch_bad.png](https://box.kancloud.cn/3a8500b9b995b624440e0a9298566145_545x101.jpg) ### SubDAGs SubDAG非常适合重复模式。 在使用Airflow时,定义一个返回DAG对象的函数是一个很好的设计模式。 Airbnb在加载数据时使用_阶段检查交换_模式。 数据在临时表中暂存,然后对该表执行数据质量检查。 一旦检查全部通过,分区就会移动到生产表中。 再举一个例子,考虑以下DAG: ![https://airflow.apache.org/_images/subdag_before.png](https://box.kancloud.cn/ff08647dba87b73d409e1a5b05326243_490x243.jpg) 我们可以将所有并行`task-*`运算符组合到一个SubDAG中,以便生成的DAG类似于以下内容: ![https://airflow.apache.org/_images/subdag_after.png](https://box.kancloud.cn/1513a94effe885d258965d133574b060_550x63.jpg) 请注意,SubDAG运算符应包含返回DAG对象的工厂方法。 这将阻止SubDAG在主UI中被视为单独的DAG。 例如: ``` #dags/subdag.py from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator # Dag is returned by a factory method def sub_dag ( parent_dag_name , child_dag_name , start_date , schedule_interval ): dag = DAG ( ' %s . %s ' % ( parent_dag_name , child_dag_name ), schedule_interval = schedule_interval , start_date = start_date , ) dummy_operator = DummyOperator ( task_id = 'dummy_task' , dag = dag , ) return dag ``` 然后可以在主DAG文件中引用此SubDAG: ``` # main_dag.py from datetime import datetime , timedelta from airflow.models import DAG from airflow.operators.subdag_operator import SubDagOperator from dags.subdag import sub_dag PARENT_DAG_NAME = 'parent_dag' CHILD_DAG_NAME = 'child_dag' main_dag = DAG ( dag_id = PARENT_DAG_NAME , schedule_interval = timedelta ( hours = 1 ), start_date = datetime ( 2016 , 1 , 1 ) ) sub_dag = SubDagOperator ( subdag = sub_dag ( PARENT_DAG_NAME , CHILD_DAG_NAME , main_dag . start_date , main_dag . schedule_interval ), task_id = CHILD_DAG_NAME , dag = main_dag , ) ``` 您可以从主DAG的图形视图放大SubDagOperator,以显示SubDAG中包含的任务: ![https://airflow.apache.org/_images/subdag_zoom.png](https://box.kancloud.cn/b3a26c0a0c05d648f7c9e31de260aa64_1000x550.jpg) 使用SubDAG时的一些其他提示: * 按照惯例,SubDAG的`dag_id`应以其父级和点为前缀。 和在`parent.child` * 通过将参数传递给SubDAG运算符来共享主DAG和SubDAG之间的参数(如上所示) * SubDAG必须有一个计划并启用。 如果SubDAG的时间表设置为`None`或`@once` ,SubDAG将成功完成而不做任何事情 * 清除SubDagOperator也会清除其中的任务状态 * 在SubDagOperator上标记成功不会影响其中的任务状态 * 避免在`depends_on_past=True`中的任务中使用`depends_on_past=True` ,因为这可能会造成混淆 * 可以为SubDAG指定执行程序。 如果要在进程中运行SubDAG并有效地将其并行性限制为1,则通常使用SequentialExecutor。 使用LocalExecutor可能会有问题,因为它可能会过度订阅您的工作人员,在单个插槽中运行多个任务 有关演示,请参阅`airflow/example_dags` 。 ### 服务水平协议 服务级别协议或任务或DAG应该成功的时间可以在任务级别设置为`timedelta` 。 如果此时一个或多个实例未成功,则会发送警报电子邮件,详细说明错过其SLA的任务列表。 该事件也记录在数据库中,并在`Browse-&gt;Missed SLAs`下的Web UI中可用,其中可以分析和记录事件。 ### 触发规则 虽然正常的工作流行为是在所有直接上游任务都成功时触发任务,但Airflow允许更复杂的依赖项设置。 所有运算符都有一个`trigger_rule`参数,该参数定义生成的任务被触发的规则。 `trigger_rule`的默认值是`all_success` ,可以定义为“当所有直接上游任务都成功时触发此任务”。 此处描述的所有其他规则都基于直接父任务,并且是在创建任务时可以传递给任何操作员的值: * `all_success` :(默认)所有父母都成功了 * `all_failed` :所有父`all_failed`都处于`failed`或`upstream_failed`状态 * `all_done` :所有父母都完成了他们的执行 * `one_failed` :一旦至少一个父母失败就会触发,它不会等待所有父母完成 * `one_success` :一旦至少一个父成功就触发,它不会等待所有父母完成 * `dummy` :依赖项仅用于show,随意触发 请注意,这些可以与`depends_on_past` (boolean)结合使用,当设置为`True` ,如果任务的先前计划未成功,则不会触发任务。 ### 最新的运行 标准工作流行为涉及为特定日期/时间范围运行一系列任务。 但是,某些工作流执行的任务与运行时无关,但需要按计划运行,就像标准的cron作业一样。 在这些情况下,暂停期间错过的回填或运行作业会浪费CPU周期。 对于这种情况,您可以使用`LatestOnlyOperator`跳过在DAG的最近计划运行期间未运行的任务。 如果现在的时间不在其`execution_time`和下一个计划的`execution_time`之间,则`LatestOnlyOperator`跳过所有直接下游任务及其自身。 必须意识到跳过的任务和触发器规则之间的相互作用。 跳过的任务将通过触发器规则`all_success`和`all_failed`级联,但不是`all_done` , `one_failed` , `one_success`和`dummy` 。 如果您希望将`LatestOnlyOperator`与不级联跳过的触发器规则一起使用,则需要确保`LatestOnlyOperator` **直接**位于您要跳过的任务的上游。 通过使用触发器规则来混合应该在典型的日期/时间依赖模式下运行的任务和使用`LatestOnlyOperator`任务是可能的。 例如,考虑以下dag: ``` #dags/latest_only_with_trigger.py import datetime as dt from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule dag = DAG ( dag_id = 'latest_only_with_trigger' , schedule_interval = dt . timedelta ( hours = 4 ), start_date = dt . datetime ( 2016 , 9 , 20 ), ) latest_only = LatestOnlyOperator ( task_id = 'latest_only' , dag = dag ) task1 = DummyOperator ( task_id = 'task1' , dag = dag ) task1 . set_upstream ( latest_only ) task2 = DummyOperator ( task_id = 'task2' , dag = dag ) task3 = DummyOperator ( task_id = 'task3' , dag = dag ) task3 . set_upstream ([ task1 , task2 ]) task4 = DummyOperator ( task_id = 'task4' , dag = dag , trigger_rule = TriggerRule . ALL_DONE ) task4 . set_upstream ([ task1 , task2 ]) ``` 在这个dag的情况下,对于除最新运行之外的所有运行, `latest_only`任务将显示为跳过。 `latest_only`直接位于`latest_only`下游,并且除了最新的之外还将跳过所有运行。 `task2`完全独立于`latest_only` ,将在所有计划的时间段内运行。 `task3`是`task3`和`task2`下游,由于默认的`trigger_rule`是`all_success`将从`all_success`接收级联跳过。 `task4`是`task4`和`task2`下游,但由于其`trigger_rule`设置为`all_done`因此一旦跳过`all_done` (有效的完成状态)并且`task2`成功,它将立即触发。 ![https://airflow.apache.org/_images/latest_only_with_trigger.png](https://box.kancloud.cn/3d4109b81c2b5fa963e6c4f33961616f_1378x526.jpg) ### 僵尸与亡灵 任务实例一直在死,通常是正常生命周期的一部分,但有时会出乎意料。 僵尸任务的特点是没有心跳(由工作定期发出)和数据库中的`running`状态。 当工作节点无法访问数据库,Airflow进程在外部被终止或者节点重新启动时,它们可能会发生。 僵尸查杀由调度程序的进程定期执行。 Undead进程的特点是存在进程和匹配的心跳,但Airflow不知道此任务在数据库中`running` 。 这种不匹配通常在数据库状态发生变化时发生,最有可能是通过删除UI中“任务实例”视图中的行。 指示任务验证其作为心跳例程的一部分的状态,并在确定它们处于这种“不死”状态时终止自身。 ### 群集策略 您的本地气流设置文件可以定义一个`policy`功能,该功能可以根据其他任务或DAG属性改变任务属性。 它接收单个参数作为对任务对象的引用,并期望改变其属性。 例如,此函数可以在使用特定运算符时应用特定队列属性,或强制执行任务超时策略,确保任何任务运行超过48小时。 以下是`airflow_settings.py` : ``` def policy ( task ): if task . __class__ . __name__ == 'HivePartitionSensor' : task . queue = "sensor_queue" if task . timeout > timedelta ( hours = 48 ): task . timeout = timedelta ( hours = 48 ) ``` ### 文档和注释 可以在Web界面中显示的dag和任务对象中添加文档或注释(dag为“Graph View”,任务为“Task Details”)。 如果定义了一组特殊任务属性,它们将被呈现为丰富内容: <colgroup><col width="38%"><col width="62%"></colgroup> | 属性 | 渲染到 | | --- | --- | | DOC | 等宽 | | doc_json | JSON | | doc_yaml | YAML | | doc_md | 降价 | | doc_rst | reStructuredText的 | 请注意,对于dags,doc_md是解释的唯一属性。 如果您的任务是从配置文件动态构建的,则此功能特别有用,它允许您公开导致Airflow中相关任务的配置。 ``` """ ### My great DAG """ dag = DAG ( 'my_dag' , default_args = default_args ) dag . doc_md = __doc__ t = BashOperator ( "foo" , dag = dag ) t . doc_md = """ \ #Title" Here's a [url](www.airbnb.com) """ ``` 此内容将分别在“图表视图”和“任务详细信息”页面中呈现为降价。 ### 金贾模板 Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的强大功能,这可以成为与宏结合使用的强大工具(参见[宏](code.html)部分)。 例如,假设您希望使用`BashOperator`将执行日期作为环境变量传递给Bash脚本。 ``` # The execution date as YYYY-MM-DD date = "{{ ds }}" t = BashOperator ( task_id = 'test_env' , bash_command = '/tmp/test.sh ' , dag = dag , env = { 'EXECUTION_DATE' : date }) ``` 这里, `{{ ds }}`是一个宏,并且由于`BashOperator`的`env`参数是使用Jinja模板化的,因此执行日期将作为Bash脚本中名为`EXECUTION_DATE`的环境变量提供。 您可以将Jinja模板与文档中标记为“模板化”的每个参数一起使用。 模板替换发生在调用运算符的pre_execute函数之前。 ## 打包的dags 虽然通常会在单个`.py`文件中指定dags,但有时可能需要将dag及其依赖项组合在一起。 例如,您可能希望将多个dag组合在一起以将它们一起版本,或者您可能希望将它们一起管理,或者您可能需要一个额外的模块,默认情况下在您运行airflow的系统上不可用。 为此,您可以创建一个zip文件,其中包含zip文件根目录中的dag,并在目录中解压缩额外的模块。 例如,您可以创建一个如下所示的zip文件: ``` my_dag1.py my_dag2.py package1/__init__.py package1/functions.py ``` Airflow将扫描zip文件并尝试加载`my_dag1.py`和`my_dag2.py` 。 它不会进入子目录,因为它们被认为是潜在的包。 如果您想将模块依赖项添加到DAG,您基本上也会这样做,但是更多的是使用virtualenv和pip。 ``` virtualenv zip_dag source zip_dag/bin/activate mkdir zip_dag_contents cd zip_dag_contents pip install --install-option = "--install-lib= $PWD " my_useful_package cp ~/my_dag.py . zip -r zip_dag.zip * ``` 注意 zip文件将插入模块搜索列表(sys.path)的开头,因此它将可用于驻留在同一解释器中的任何其他代码。 注意 包装的dags不能与打开酸洗一起使用。 注意 打包的dag不能包含动态库(例如libz.so),如果模块需要这些库,则需要在系统上使用这些库。 换句话说,只能打包纯python模块。