🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# 常见问题 ## 为什么我的任务没有安排好? 您的任务可能无法安排的原因有很多。 以下是一些常见原因: * 您的脚本是否“编译”,Airflow引擎是否可以解析它并找到您的DAG对象。 要对此进行测试,您可以运行`airflow list_dags`并确认您的DAG显示在列表中。 您还可以运行`airflow list_tasks foo_dag_id --tree`并确认您的任务按预期显示在列表中。 如果您使用CeleryExecutor,您可能需要确认这既适用于调度程序运行的位置,也适用于工作程序运行的位置。 * 包含DAG的文件是否在内容的某处包含字符串“airflow”和“DAG”? 在搜索DAG目录时,Airflow忽略不包含“airflow”和“DAG”的文件,以防止DagBag解析导入与用户的DAG并置的所有python文件。 * 你的`start_date`设置正确吗? 在传递`start_date + scheduler_interval`之后,Airflow调度程序会立即触发任务。 * 您的`schedule_interval`设置正确吗? 默认`schedule_interval`是一天( `datetime.timedelta(1)` )。 您必须直接为实例化的DAG对象指定不同的`schedule_interval` ,而不是`default_param` ,因为任务实例不会覆盖其父DAG的`schedule_interval` 。 * 您的`start_date`超出了在UI中可以看到的位置吗? 如果将`start_date`设置为3个月之前的某个时间,您将无法在UI的主视图中看到它,但您应该能够在`Menu -> Browse ->Task Instances`看到它。 * 是否满足任务的依赖性。 直接位于任务上游的任务实例需要处于`success`状态。 此外,如果已设置`depends_on_past=True` ,则上一个任务实例需要成功(除非它是该任务的第一次运行)。 此外,如果`wait_for_downstream=True` ,请确保您了解其含义。 您可以从`Task Instance Details`页面查看如何设置这些属性。 * 您需要创建并激活DagRuns吗? DagRun表示整个DAG的特定执行,并具有状态(运行,成功,失败,......)。 调度程序在向前移动时创建新的DagRun,但永远不会及时创建新的DagRun。 调度程序仅评估`running` DagRuns以查看它可以触发的任务实例。 请注意,清除任务实例(从UI或CLI)确实将DagRun的状态设置为恢复运行。 您可以通过单击DAG的计划标记来批量查看DagRuns列表并更改状态。 * 是否达到了DAG的`concurrency`参数? `concurrency`定义了允许DAG `running`任务实例的数量,超过这一点,事物就会排队。 * 是否达到了DAG的`max_active_runs`参数? `max_active_runs`定义允许的DAG `running`并发实例的数量。 您可能还想阅读文档的“计划程序”部分,并确保完全了解其进度。 ## 如何根据其他任务的失败触发任务? 查看文档“概念`Trigger Rule`部分中的“ `Trigger Rule`部分 ## 安装airflow [crypto]后,为什么连接密码仍未在元数据db中加密? 查看文档“配置”部分中的“ `Connections`部分 ## 与`start_date`什么关系? `start_date`是前DagRun时代的部分遗产,但它在很多方面仍然具有相关性。 创建新DAG时,您可能希望使用`default_args`为任务设置全局`start_date` 。 要创建的第一个DagRun将基于所有任务的`min(start_date)` 。 从那时起,调度程序根据您的schedule_interval创建新的DagRuns,并在满足您的依赖项时运行相应的任务实例。 在向DAG引入新任务时,您需要特别注意`start_date` ,并且可能希望重新激活非活动DagRuns以正确启用新任务。 我们建议不要使用动态值作为`start_date` ,尤其是`datetime.now()`因为它可能非常混乱。 一旦周期结束,任务就会被触发,理论上, `@hourly` DAG永远不会达到一小时后,因为`now()`会移动。 以前我们还建议使用与`schedule_interval`相关的舍入`start_date` 。 这意味着`@hourly`将在`00:00`分钟:秒,午夜的`@monthly`工作,在这个月的第一个月的`@monthly`工作。 这不再是必需的。 现在,Airflow将自动对齐`start_date`和`schedule_interval` ,方法是使用`start_date`作为开始查看的时刻。 您可以使用任何传感器或`TimeDeltaSensor`来延迟计划间隔内的任务执行。 虽然`schedule_interval`允许指定`datetime.timedelta`对象,但我们建议使用宏或cron表达式,因为它强制执行舍入计划的这种想法。 使用`depends_on_past=True`时,必须特别注意`start_date`因为过去的依赖关系不会仅针对为任务指定的`start_date`的特定计划强制执行。 除非您计划为新任务运行回填,否则在引入新的`depends_on_past=True`时及时观察DagRun活动状态也很重要。 另外需要注意的是,在回填CLI命令的上下文中,任务`start_date`会被回填命令`start_date`覆盖。 这允许对具有`depends_on_past=True`任务实际启动的回填,如果不是这样,则回填就不会启动。 ## 如何动态创建DAG? Airflow在`DAGS_FOLDER`查找其全局命名空间中包含`DAG`对象的模块,并在`DagBag`添加它找到的对象。 知道这一切我们需要的是一种在全局命名空间中动态分配变量的方法,这可以在python中使用`globals()`函数轻松完成,标准库的行为就像一个简单的字典。 ``` for i in range ( 10 ): dag_id = 'foo_ {} ' . format ( i ) globals ()[ dag_id ] = DAG ( dag_id ) # or better, call a function that returns a DAG object! ``` ## 我的进程列表中的所有`airflow run`命令是什么? `airflow run`命令有很多层,这意味着它可以调用自身。 * 基本`airflow run` :启动执行程序,并告诉它运行`airflow run --local`命令。 如果使用Celery,这意味着它会在队列中放置一个命令,使其在worker上运行远程。 如果使用LocalExecutor,则转换为在子进程池中运行它。 * 本地`airflow run --local` :启动`airflow run --raw`命令(如下所述)作为子`airflow run --raw` ,负责发出心跳,监听外部`airflow run --raw`信号,并确保在子进程失败时进行一些清理 * 原始`airflow run --raw`运行实际操作员的执行方法并执行实际工作 ## 我的气流dag如何运行得更快? 我们可以控制三个变量来改善气流dag性能: * `parallelism` :此变量控制气流工作者可以同时运行的任务实例的数量。 用户可以增加`airflow.cfg`的并行度变量。 * `concurrency` :Airflow调度程序在任何给定时间都将为您的DAG运行不超过`$concurrency`任务实例。 并发性在Airflow DAG中定义。 如果未在DAG上设置并发性,则调度程序将使用`dag_concurrency`条目的缺省值。 * `max_active_runs` :在给定时间,Airflow调度程序将运行不超过DAG的`max_active_runs` DagRuns。 如果未在DAG中设置`max_active_runs` ,则调度程序将使用`airflow.cfg` `max_active_runs_per_dag`条目的缺省值。 ## 我们如何减少气流UI页面加载时间? 如果你的dag需要很长时间才能加载,你可以将`airflow.cfg`中`default_dag_run_display_number`配置的值`airflow.cfg`到一个较小的值。 此可配置控制在UI中显示的dag run的数量,默认值为25。 ## 如何修复异常:全局变量explicit_defaults_for_timestamp需要打开(1)? 这意味着在mysql服务器中禁用了`explicit_defaults_for_timestamp` ,您需要通过以下方式启用它: 1. 在my.cnf文件的mysqld部分下设置`explicit_defaults_for_timestamp = 1` 。 2. 重启Mysql服务器。 ## 如何减少生产中的气流dag调度延迟? * `max_threads` :Scheduler将并行生成多个线程来安排dags。 这由`max_threads`控制,默认值为2.用户应在生产中将此值增加到更大的值(例如,调度程序运行的cpus的数量 - 1)。 * `scheduler_heartbeat_sec` :用户应考虑将`scheduler_heartbeat_sec`配置增加到更高的值(例如60秒),该值控制气流调度程序获取心跳的频率并更新作业在数据库中的条目。