Airflow实现分布式Job调度系统

一直以来,我都是用Crontab作为Linux环境的Job调度工具,同时注册Job执行结果到数据表,设置全局的logging配置,输出Job日志到服务器集中的日志目录中。使用Crontab来管理Job调度优势明显:

但是,随着项目中的Job数量越来越多,一些Crontab不能解决的问题也凸显出来

  1. Crontab不能对一系列Job进行依赖执行,比如任务A,B,C,因B和C任务是根据A任务的结果执行的,所以执行顺序应该是 A完成后,B和C才能执行,但Crontab却并不能处理这种情况,只能通过排定时间先后顺序,或者程序员合并A/B/C三个Job到一个Job中才能处理
  2. 部署管理不方便,如果Job数量不多,只需要在单台服务器运行Cron Job即可,但如果是Job数量多,需要部署到多台服务器,那么只能逐台服务器分散配置Crontab,如何调度分派任务到服务器并不容易,需要合理计算作业服务器的资源情况
  3. 查看Job执行结果不方便,必须要自主设计一套作业执行结果报告方案
  4. 如果Job运行失败,开发人员只能登陆到服务器去手动重新执行
  5. 日志分散,通常日志输出到Job作业服务器,如果要调试、查阅或者采集日志,还需要构建一套日志系统,否则登录到服务器上查看日志,在IT资源管理上并不现实
  6. 缺UI管理工具

Airflow是Airbnb开发的任务流管理工具,在1.8之前,由Airbnb维护并开源,从1.9开始,Airflow进入了Apache incubator,目前的稳定版本号是 1.10。Airflow可以作为Crontab的完美替代,胜任项目或者公司级别的任务调度系统。很多成功案例表明,用一个Airflow管理数千的任务也绰绰有余。

Airflow由Python语言开发,运行在*nix系统(不支持Windows系统),目前支持 Python2.7, 3.4, 3.5版本,通过Python包管理平台pypi发布。

需要特别注意的是,在pypi上同时存在 airflowapache-airflow 两个软件包,airflow 是项目进入Apache孵化器之前的名称,最新版本号更新到1.8,apache-airflow 是项目进入Apache孵化器之后的名称,最新的1.9、1.10以及未来的版本,都会通过 apache-airflow 发布。所以下载安装时一定注意认准名称 apache-airflow

基本概念

DAG,英文原名“Directed Acyclic Graph”,直译“有向无循环图”,实际上是一系列的 “Job”/“Task”/“作业”/“任务”(无特殊说明,本文认为以上四个中英说法代表同一含义)的集合,并且描述了这些Task之间的依赖关系,这种依赖关系,我们称之为“pipeline”。

Operator,运行者,如上所述,DAG 由task组成,一个task可能是一个函数,一个Linux命令行调用,一段SQL语句,一个HTTP请求等,这些特定的task任务类型,需要一个特定的执行者来运行,比如:

  • BashOperator 执行Linux命令行调用
  • PythonOperator 执行Python函数
  • MySqlOperator 向MySQL数据库运行指定的SQL任务
  • SimpleHttpOperator 发起指定的HTTP请求(比如通过调用web服务执行特定任务)

Executor,执行者,可以理解为 Operator 是通过Executor运行起来的,Airflow默认的Executor有:

  • SequentialExecutor, LocalExecutor,本机运行的Executor,其中 SequentialExecutor 每次只能执行一个任务,主要debug用,但Airflow的模板配置文件默认使用这个Executor,一定记得改掉。LocalExecutor 通过spawn进程来执行任务。
  • CeleryExecutor, DaskExecutor,分布式Executor,建议使用 CeleryExecutor

Metastore,Airflow的存储数据库,通常配置为MySQL或Postgresql这样的关系型数据库

scheduler,Airflow调度程序

worker,Airflow任务执行程序

AIRFLOW_HOME,指定Airflow运行的主目录,默认情况下在 ~/airflow,可通过环境变量 export AIRFLOW_HOME=path 调整目录位置,没有特殊需求不建议修改,确保程序在指定的用户环境下运行即可

环境选择

下面是我选择的基本环境:

  • Python2.7 (由于目前支持 python3.4,3.5环境,并没有支持3.6,而我建议现在Python主开发版本选择2.7和3.6比较兼顾稳定性和新特性,所以用Python2.7)
  • CeleryExecutor
  • Redis 做 CeleryExecutor 的backend
  • MySQL 数据库做 metastore

安装

通过命令 SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow[celery,redis,crypto,mysql] 安装,[celery,redis,crypto,mysql] 是我们选择的组件,其中 crypto 用于UI管理界面的权限认证模块。另外我安装完成后总提示缺 flask_bcrypt,所以再单独安装此模块 pip install flask_bcrypt。需要注意的

  • 不要尝试单独安装 MySQL 驱动包,因Airflow默认只支持 mysqlclient 驱动包,所以还是通过官方预定义的 extra_package 安装方式安装mysql驱动

配置

安装完成后,直接在终端执行 airflow 即可初始化 ~/airflow 目录,同时创建模板配置文件 ~/airflow/airflow.cfg

修改配置文件 airflow.cfg

  • executor = CeleryExecutor 指定 executor 为CeleryExecutor
  • sql_alchemy_conn = mysql://{username}:{password}@{host}/{dbname} ,指定metastore数据库连接,这里特别注意,虽然sqlalchemy本身支持通过 mysql+mysqldb://... 这样的语法修改数据库驱动库,但是这里千万别指定,因为Airflow默认仅支持mysqlclient驱动。我之前就因为这里改了,一直出现SQL syntax错误,非常困惑
  • fernet_key = $fernet_keyfernet_key 必须改,通过代码生成一段特定格式密钥,参考链接:https://incubator-airflow.readthedocs.io/en/latest/configuration.html#connections
from cryptography.fernet
import Fernet fernet_key= Fernet.generate_key()
print(fernet_key)
  • catchup_by_default = False 此配置项默认情况下是 True ,当加入新的 DAG ,而 start_date 和当前时间之间有周期task没有分派调度时,会自动把之前的调度全部加入到schedule队列,通常这个功能是不需要的,务必修改为 False
  • broker_url = redis://{hostname}:6379/9 指定celery的broker地址
  • celery_result_backend 文档描述的名称似乎和实际不对应,加上 result_backend 才解决问题
celery_result_backend = db+mysql://{username}:{password}@{host}/{dbname}
; configuration key may not sync with airflow documentation
result_backend = db+mysql://{username}:{password}@{host}/{dbname}

创建初始的WEB界面登录账号密码,参考:https://airflow.incubator.apache.org/security.html#web-authentication

from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser


def init_user():
    user = PasswordUser(models.User())
    user.username = 'admin'
    user.email = 'sample@sample.com'
    user.password = 'password'
    session = settings.Session()
    session.add(user)
    session.commit()
    session.close()

if __name__ == ‘__main__’:
    init_user()

修改好配置后,同时运行

  • airflow scheduler 启动scheduler
  • airflow webserver 启动webserver
  • airflow worker 启动worker

编写DAG

按照官方案例:

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
“""
# 导入时区库
import pendulum

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# 创建中国时区
tz_info = pendulum.timezone("Asia/Shanghai")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    #'start_date': datetime(2015, 6, 1),
    'start_date': datetime(2015, 6, 1, tzinfo=tz_info),  # 指定时区
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# dag = DAG('tutorial', default_args=default_args)
dag = DAG('tutorial', default_args=default_args, schedule_interval='59 23 * * *’) # 使用cron表达式指定调度规则

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

DAG代码说明:

  • 官方文档已经有详细介绍:https://airflow.apache.org/tutorial.html
  • 第一点补充,加入时区配置,否则DAG会按照UTC时间运行
  • 第二点补充,通过 cron 表达式指定调度时间,和使用Crontab一样

分布式部署

建议通过 supervisor 来运行 scheduler / webserver / worker 程序,参考我的supervisor配置节

[program:airflow_scheduler]
command=airflow scheduler
autorestart=true
redirect_stderr=true
stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_scheduler.txt
stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_scheduler_err.txt
stopsignal=INT

[program:airflow_worker]
command=airflow worker
autorestart=true
redirect_stderr=true
stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_worker.txt
stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_worker_err.txt
stopsignal=INT

[program:airflow_webserver]
command=airflow webserver
autorestart=true
redirect_stderr=true
stdout_logfile=/home/mscn/supervisor/log/childlog/airflow_webserver.txt
stderr_logfile=/home/mscn/supervisor/log/childlog/airflow_webserver_err.txt
stopsignal=INT

注意在supervisor配置中指定运行用户为你airflow主目录所在的用户,配置节 [supervisord] user = mscn

将部署主机分为

  • Master Node,运行 scheduler 和 webserver,单主机
  • Worker Node,运行 worker,任意多台主机,视任务运行数量而定

部署架构如图:

airflow-deploy-architecture
Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.