一直以来,我都是用Crontab作为Linux环境的Job调度工具,同时注册Job执行结果到数据表,设置全局的logging配置,输出Job日志到服务器集中的日志目录中。使用Crontab来管理Job调度优势明显:
- 任何Linux发行版都自带Crontab,它是非常可靠;
- cron expression配置能够满足大部分的使用场景;
- Cron作为Linux的常驻进程,不需要做特别的部署处理;
- 通过配置系统的邮件设置,还可以发送Job错误警报邮件。
但是,随着项目中的Job数量越来越多,一些Crontab不能解决的问题也凸显出来
- Crontab不能对一系列Job进行依赖执行,比如任务A,B,C,因B和C任务是根据A任务的结果执行的,所以执行顺序应该是 A完成后,B和C才能执行,但Crontab却并不能处理这种情况,只能通过排定时间先后顺序,或者程序员合并A/B/C三个Job到一个Job中才能处理
- 部署管理不方便,如果Job数量不多,只需要在单台服务器运行Cron Job即可,但如果是Job数量多,需要部署到多台服务器,那么只能逐台服务器分散配置Crontab,如何调度分派任务到服务器并不容易,需要合理计算作业服务器的资源情况
- 查看Job执行结果不方便,必须要自主设计一套作业执行结果报告方案
- 如果Job运行失败,开发人员只能登陆到服务器去手动重新执行
- 日志分散,通常日志输出到Job作业服务器,如果要调试、查阅或者采集日志,还需要构建一套日志系统,否则登录到服务器上查看日志,在IT资源管理上并不现实
- 缺UI管理工具
Airflow是Airbnb开发的任务流管理工具,在1.8之前,由Airbnb维护并开源,从1.9开始,Airflow进入了Apache incubator,目前的稳定版本号是 1.10。Airflow可以作为Crontab的完美替代,胜任项目或者公司级别的任务调度系统。很多成功案例表明,用一个Airflow管理数千的任务也绰绰有余。
Airflow由Python语言开发,运行在*nix系统(不支持Windows系统),目前支持 Python2.7, 3.4, 3.5版本,通过Python包管理平台pypi发布。
需要特别注意的是,在pypi上同时存在 airflow 和 apache-airflow 两个软件包,airflow 是项目进入Apache孵化器之前的名称,最新版本号更新到1.8,apache-airflow 是项目进入Apache孵化器之后的名称,最新的1.9、1.10以及未来的版本,都会通过 apache-airflow 发布。所以下载安装时一定注意认准名称 apache-airflow 。
基本概念
DAG,英文原名“Directed Acyclic Graph”,直译“有向无循环图”,实际上是一系列的 “Job”/“Task”/“作业”/“任务”(无特殊说明,本文认为以上四个中英说法代表同一含义)的集合,并且描述了这些Task之间的依赖关系,这种依赖关系,我们称之为“pipeline”。
Operator,运行者,如上所述,DAG 由task组成,一个task可能是一个函数,一个Linux命令行调用,一段SQL语句,一个HTTP请求等,这些特定的task任务类型,需要一个特定的执行者来运行,比如:
- BashOperator 执行Linux命令行调用
- PythonOperator 执行Python函数
- MySqlOperator 向MySQL数据库运行指定的SQL任务
- SimpleHttpOperator 发起指定的HTTP请求(比如通过调用web服务执行特定任务)
Executor,执行者,可以理解为 Operator 是通过Executor运行起来的,Airflow默认的Executor有:
- SequentialExecutor, LocalExecutor,本机运行的Executor,其中 SequentialExecutor 每次只能执行一个任务,主要debug用,但Airflow的模板配置文件默认使用这个Executor,一定记得改掉。LocalExecutor 通过spawn进程来执行任务。
- CeleryExecutor, DaskExecutor,分布式Executor,建议使用 CeleryExecutor
Metastore,Airflow的存储数据库,通常配置为MySQL或Postgresql这样的关系型数据库
scheduler,Airflow调度程序
worker,Airflow任务执行程序
AIRFLOW_HOME,指定Airflow运行的主目录,默认情况下在 ~/airflow,可通过环境变量 export AIRFLOW_HOME=path 调整目录位置,没有特殊需求不建议修改,确保程序在指定的用户环境下运行即可
环境选择
下面是我选择的基本环境:
- Python2.7 (由于目前支持 python3.4,3.5环境,并没有支持3.6,而我建议现在Python主开发版本选择2.7和3.6比较兼顾稳定性和新特性,所以用Python2.7)
- CeleryExecutor
- Redis 做 CeleryExecutor 的backend
- MySQL 数据库做 metastore
安装
通过命令 SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow[celery,redis,crypto,mysql] 安装,[celery,redis,crypto,mysql] 是我们选择的组件,其中 crypto 用于UI管理界面的权限认证模块。另外我安装完成后总提示缺 flask_bcrypt,所以再单独安装此模块 pip install flask_bcrypt。需要注意的
- 不要尝试单独安装 MySQL 驱动包,因Airflow默认只支持 mysqlclient 驱动包,所以还是通过官方预定义的 extra_package 安装方式安装mysql驱动
配置
安装完成后,直接在终端执行 airflow 即可初始化 ~/airflow 目录,同时创建模板配置文件 ~/airflow/airflow.cfg
修改配置文件 airflow.cfg
- executor = CeleryExecutor 指定 executor 为CeleryExecutor
- sql_alchemy_conn = mysql://{username}:{password}@{host}/{dbname} ,指定metastore数据库连接,这里特别注意,虽然sqlalchemy本身支持通过 mysql+mysqldb://... 这样的语法修改数据库驱动库,但是这里千万别指定,因为Airflow默认仅支持mysqlclient驱动。我之前就因为这里改了,一直出现SQL syntax错误,非常困惑
- fernet_key = $fernet_key ,fernet_key 必须改,通过代码生成一段特定格式密钥,参考链接:https://incubator-airflow.readthedocs.io/en/latest/configuration.html#connections
from cryptography.fernet import Fernet fernet_key= Fernet.generate_key() print(fernet_key)
- catchup_by_default = False 此配置项默认情况下是 True ,当加入新的 DAG ,而 start_date 和当前时间之间有周期task没有分派调度时,会自动把之前的调度全部加入到schedule队列,通常这个功能是不需要的,务必修改为 False
- broker_url = redis://{hostname}:6379/9 指定celery的broker地址
- celery_result_backend 文档描述的名称似乎和实际不对应,加上 result_backend 才解决问题
celery_result_backend = db+mysql://{username}:{password}@{host}/{dbname} ; configuration key may not sync with airflow documentation result_backend = db+mysql://{username}:{password}@{host}/{dbname}
创建初始的WEB界面登录账号密码,参考:https://airflow.incubator.apache.org/security.html#web-authentication
from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser def init_user(): user = PasswordUser(models.User()) user.username = 'admin' user.email = 'sample@sample.com' user.password = 'password' session = settings.Session() session.add(user) session.commit() session.close() if __name__ == ‘__main__’: init_user()
修改好配置后,同时运行
- airflow scheduler 启动scheduler
- airflow webserver 启动webserver
- airflow worker 启动worker
编写DAG
按照官方案例:
""" Code that goes along with the Airflow tutorial located at: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py “"" # 导入时区库 import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # 创建中国时区 tz_info = pendulum.timezone("Asia/Shanghai") default_args = { 'owner': 'airflow', 'depends_on_past': False, #'start_date': datetime(2015, 6, 1), 'start_date': datetime(2015, 6, 1, tzinfo=tz_info), # 指定时区 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } # dag = DAG('tutorial', default_args=default_args) dag = DAG('tutorial', default_args=default_args, schedule_interval='59 23 * * *’) # 使用cron表达式指定调度规则 # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1)
DAG代码说明:
- 官方文档已经有详细介绍:https://airflow.apache.org/tutorial.html
- 第一点补充,加入时区配置,否则DAG会按照UTC时间运行
- 第二点补充,通过 cron 表达式指定调度时间,和使用Crontab一样
分布式部署
建议通过 supervisor 来运行 scheduler / webserver / worker 程序,参考我的supervisor配置节
[program:airflow_scheduler] command=airflow scheduler autorestart=true redirect_stderr=true stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_scheduler.txt stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_scheduler_err.txt stopsignal=INT [program:airflow_worker] command=airflow worker autorestart=true redirect_stderr=true stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_worker.txt stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_worker_err.txt stopsignal=INT [program:airflow_webserver] command=airflow webserver autorestart=true redirect_stderr=true stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_webserver.txt stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_webserver_err.txt stopsignal=INT
注意在supervisor配置中指定运行用户为你airflow主目录所在的用户,配置节 [supervisord] user = mscn
将部署主机分为
- Master Node,运行 scheduler 和 webserver,单主机
- Worker Node,运行 worker,任意多台主机,视任务运行数量而定
部署架构如图:
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.