企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 写日志 ## 在本地编写日志 用户可以使用`base_log_folder`设置在`airflow.cfg`指定日志文件夹。 默认情况下,它位于`AIRFLOW_HOME`目录中。 此外,用户可以提供远程位置,以便在云存储中存储日志和日志备份。 在Airflow Web UI中,本地日志优先于远程日志。 如果找不到或访问本地日志,将显示远程日志。 请注意,只有在任务完成(包括失败)后才会将日志发送到远程存储。 换句话说,运行任务的远程日志不可用。 日志作为`{dag_id}/{task_id}/{execution_date}/{try_number}.log`存储在日志文件夹中。 ## 将日志写入Amazon S3 ### 在你开始之前 远程日志记录使用现有的Airflow连接来读取/写入日志。 如果没有正确设置连接,则会失败。 ### 启用远程日志记录 要启用此功能,必须按照此示例配置`airflow.cfg` : ``` [ core ] # Airflow can store logs remotely in AWS S3\. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_base_log_folder = s3://my-bucket/path/to/logs remote_log_conn_id = MyS3Conn # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False ``` 在上面的例子中,Airflow将尝试使用`S3Hook('MyS3Conn')` 。 ## 将日志写入Azure Blob存储 可以将Airflow配置为在Azure Blob存储中读取和写入任务日志。 按照以下步骤启用Azure Blob存储日志记录。 1. Airflow的日志记录系统需要将自定义.py文件放在`PYTHONPATH` ,以便可以从Airflow导入。 首先创建一个存储配置文件的目录。 建议使用`$AIRFLOW_HOME/config` 。 2. 创建名为`$AIRFLOW_HOME/config/log_config.py`和`$AIRFLOW_HOME/config/__init__.py`空文件。 3. 将`airflow/config_templates/airflow_local_settings.py`的内容复制到刚刚在上面的步骤中创建的`log_config.py`文件中。 4. 自定义模板的以下部分: > ``` > # wasb buckets should start with "wasb" just to help Airflow select correct handler > REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>' > > # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG > LOGGING_CONFIG = ... > > ``` 5. 确保已在Airflow中定义Azure Blob存储(Wasb)连接挂钩。 挂钩应具有对`REMOTE_BASE_LOG_FOLDER`定义的Azure Blob存储桶的读写访问权限。 6. 更新`$AIRFLOW_HOME/airflow.cfg`以包含: > ``` > remote_logging = True > logging_config_class = log_config.LOGGING_CONFIG > remote_log_conn_id = <name of the Azure Blob Storage connection> > > ``` 7. 重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。 8. 验证日志是否显示在您定义的存储桶中新执行的任务中。 ## 将日志写入Google云端存储 请按照以下步骤启用Google云端存储日志记录。 1. Airflow的日志记录系统需要将自定义.py文件放在`PYTHONPATH` ,以便可以从Airflow导入。 首先创建一个存储配置文件的目录。 建议使用`$AIRFLOW_HOME/config` 。 2. 创建名为`$AIRFLOW_HOME/config/log_config.py`和`$AIRFLOW_HOME/config/__init__.py`空文件。 3. 将`airflow/config_templates/airflow_local_settings.py`的内容复制到刚刚在上面的步骤中创建的`log_config.py`文件中。 4. 自定义模板的以下部分: > ``` > # Add this variable to the top of the file. Note the trailing slash. > GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/' > > # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG > LOGGING_CONFIG = ... > > # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable > 'gcs.task' : { > 'class' : 'airflow.utils.log.gcs_task_handler.GCSTaskHandler' , > 'formatter' : 'airflow.task' , > 'base_log_folder' : os.path.expanduser ( BASE_LOG_FOLDER ) , > 'gcs_log_folder' : GCS_LOG_FOLDER, > 'filename_template' : FILENAME_TEMPLATE, > } , > > # Update the airflow.task and airflow.task_runner blocks to be 'gcs.task' instead of 'file.task'. > 'loggers' : { > 'airflow.task' : { > 'handlers' : [ 'gcs.task' ] , > ... > } , > 'airflow.task_runner' : { > 'handlers' : [ 'gcs.task' ] , > ... > } , > 'airflow' : { > 'handlers' : [ 'console' ] , > ... > } , > } > > ``` 5. 确保已在Airflow中定义了Google Cloud Platform连接挂钩。 该挂钩应具有对`GCS_LOG_FOLDER`定义的Google Cloud Storage存储桶的读写访问权限。 6. 更新`$AIRFLOW_HOME/airflow.cfg`以包含: > ``` > task_log_reader = gcs.task > logging_config_class = log_config.LOGGING_CONFIG > remote_log_conn_id = <name of the Google cloud platform hook> > > ``` 7. 重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。 8. 验证日志是否显示在您定义的存储桶中新执行的任务中。 9. 确认Google Cloud Storage查看器在U​​I中正常运行。 拉出新执行的任务,并验证您是否看到类似的内容: > ``` > *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. > [ 2017 -10-03 21 :57:50,056 ] { cli.py:377 } INFO - Running on host chrisr-00532 > [ 2017 -10-03 21 :57:50,093 ] { base_task_runner.py:115 } INFO - Running: [ 'bash' , '-c' , u 'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py' ] > [ 2017 -10-03 21 :57:51,264 ] { base_task_runner.py:98 } INFO - Subtask: [ 2017 -10-03 21 :57:51,263 ] { __init__.py:45 } INFO - Using executor SequentialExecutor > [ 2017 -10-03 21 :57:51,306 ] { base_task_runner.py:98 } INFO - Subtask: [ 2017 -10-03 21 :57:51,306 ] { models.py:186 } INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py > > ``` 请注意它从远程日志文件中读取的顶行。 请注意,如果您使用旧式airflow.cfg配置方法将日志保存到Google云端存储,则旧的日志将不再在Airflow用户界面中显示,但它们仍将存在于Google云端存储中。 这是一个向后无比的变化。 如果您对此不满意,可以更改`FILENAME_TEMPLATE`以反映旧式日志文件名格式。