ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
# 插件 Airflow内置了一个简单的插件管理器,可以通过简单地删除`$AIRFLOW_HOME/plugins`文件夹中的文件,将外部功能集成到其核心。 `plugins`文件夹中的python模块将被导入, **钩子** , **操作符** , **传感器** , **宏** , **执行器**和Web **视图**将集成到Airflow的主要集合中,并可供使用。 ## 做什么的? Airflow提供了一个用于处理数据的通用工具箱。 不同的组织有不同的堆栈和不同的需求。 使用Airflow插件可以让公司定制他们的Airflow安装以反映他们的生态系统。 插件可以用作编写,共享和激活新功能集的简便方法。 还需要一组更复杂的应用程序来与不同风格的数据和元数据进行交互。 例子: * 一组用于解析Hive日志和公开Hive元数据(CPU / IO /阶段/倾斜/ ...)的工具 * 异常检测框架,允许人们收集指标,设置阈值和警报 * 审计工具,帮助了解谁访问了什么 * 配置驱动的SLA监控工具,允许您设置受监控的表以及应该在何时着陆,提醒人员并公开停机的可视化 * ... ## 为什么要建立在Airflow之上? Airflow有许多组件可以在构建应用程序时重用: * 可用于呈现视图的Web服务器 * 用于存储模型的元数据数据库 * 访问您的数据库,以及如何连接到它们的知识 * 应用程序可以将工作负载推送到的一组工作者 * 部署了Airflow,您可以专注于部署物流 * 基本的图表功能,底层库和抽象 ## 接口 要创建插件,您需要派生`airflow.plugins_manager.AirflowPlugin`类并引用要插入Airflow的对象。 以下是您需要派生的类看起来像: ``` class AirflowPlugin ( object ): # The name of your plugin (str) name = None # A list of class(es) derived from BaseOperator operators = [] # A list of class(es) derived from BaseSensorOperator sensors = [] # A list of class(es) derived from BaseHook hooks = [] # A list of class(es) derived from BaseExecutor executors = [] # A list of references to inject into the macros namespace macros = [] # A list of objects created from a class derived # from flask_admin.BaseView admin_views = [] # A list of Blueprint object created from flask.Blueprint flask_blueprints = [] # A list of menu links (flask_admin.base.MenuLink) menu_links = [] ``` 您可以通过继承派生它(请参阅下面的示例)。 请注意,必须指定此类中的`name` 。 将插件导入Airflow后,您可以使用类似语句调用它 ``` from airflow. { type , like "operators" , "sensors" } . { name specificed inside the plugin class } import * ``` 当您编写自己的插件时,请确保您理解它们。 每种类型的插件都有一些基本属性。 例如, * 对于`Operator`插件,必须使用`execute`方法。 * 对于`Sensor`插件,必须使用返回布尔值的`poke`方法。 ## 例 下面的代码定义了一个插件,它在Airflow中注入一组虚拟对象定义。 ``` # This is the class you derive to create a plugin from airflow.plugins_manager import AirflowPlugin from flask import Blueprint from flask_admin import BaseView , expose from flask_admin.base import MenuLink # Importing base classes that we need to derive from airflow.hooks.base_hook import BaseHook from airflow.models import BaseOperator from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.executors.base_executor import BaseExecutor # Will show up under airflow.hooks.test_plugin.PluginHook class PluginHook ( BaseHook ): pass # Will show up under airflow.operators.test_plugin.PluginOperator class PluginOperator ( BaseOperator ): pass # Will show up under airflow.sensors.test_plugin.PluginSensorOperator class PluginSensorOperator ( BaseSensorOperator ): pass # Will show up under airflow.executors.test_plugin.PluginExecutor class PluginExecutor ( BaseExecutor ): pass # Will show up under airflow.macros.test_plugin.plugin_macro def plugin_macro (): pass # Creating a flask admin BaseView class TestView ( BaseView ): @expose ( '/' ) def test ( self ): # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html return self . render ( "test_plugin/test.html" , content = "Hello galaxy!" ) v = TestView ( category = "Test Plugin" , name = "Test View" ) # Creating a flask blueprint to integrate the templates and static folder bp = Blueprint ( "test_plugin" , __name__ , template_folder = 'templates' , # registers airflow/plugins/templates as a Jinja template folder static_folder = 'static' , static_url_path = '/static/test_plugin' ) ml = MenuLink ( category = 'Test Plugin' , name = 'Test Menu Link' , url = 'https://airflow.incubator.apache.org/' ) # Defining the plugin class class AirflowTestPlugin ( AirflowPlugin ): name = "test_plugin" operators = [ PluginOperator ] sensors = [ PluginSensorOperator ] hooks = [ PluginHook ] executors = [ PluginExecutor ] macros = [ plugin_macro ] admin_views = [ v ] flask_blueprints = [ bp ] menu_links = [ ml ] ```