What is the Airflow

Airflow is Airbnb’s open source workflow management project with its own Web UI and scheduling. Now do hatching under Apache, the address is github.com/apache/incu…

airflow

Airflow’s solution to what problem

Airflow’s main problem can be seen in Airbnb’s official blog: Airflow A-Airflow – Workflow – Management-platform. In short, it manages and schedules various offline scheduled jobs in place of cronTab.

When Cron jobs are in the hundreds or thousands, they can be extremely demanding and if your team has experienced this you know the pain, so using a tool like airflow to do the scheduled tasks instead of Cron would be a huge improvement.

I need to know and prepare before I start using Airflow

Airflow has been renamed Apache-airflow on PIP. To download the latest version, use PIP install Apache-airflow.

Airflow 1.8 version relies on MySQL 5.6 above, 5.7 below bid 1071, U ‘Specified key was too long; Maxkey Length is 767 bytes. If you are using MySQL as your airflow Backend, please update your MySQL to the latest version.

Airflow: u”Table ‘Performance_schema.session_variables’ doesn’t exist”, Run the mysql_upgrade -u root -p –force command to resolve the problem.

Airflow’s mysql Driver is using mysqlClient mysql://root:@127.0.0.1/sqlalchemy_lab? Charset = utF8, syntax error will be reported if other driver is used.

Basic concept

Airflow’s two most basic concepts are DAG and Task. The DAG’s full name is Directed Acyclic Graph, which is a collection of all the tasks that you want to perform and in which you define their dependencies. A DAG is a DAG object, which can be configured in a Python script.

For example, A simple DAG contains three tasks: task A, task B, and task C. Task B can be executed only after task A is successfully executed. Task C can be executed independently of task A and task B. In this simple DAG, A, B, and C can be any task you want to perform.

Airflow: The DAG definition is made available in Python. The DAG is a Python file stored in the DAG directory. Airflow will build the DAG Objects dynamically from the directory. Each Workflow can contain any task.

Installation and use

Airflow is built in Python and can be easily installed with PIP, which installs apache-airflow. By default, Airflow stores its configuration in the ~/ Airflow directory.

Airflow provides a number of commands to perform the initialization of Airflow and its proper use.

#Initialize the database and airflow configuration in the Airflow directory
airflow initdb
#Start the airflow web
airflow webserver
#Start scheduling
airflow schedulerCopy the code

Airflow.incubator.apache.org/ more detailed information please refer to the document

The first DAG

The DAG configuration is done in Python like this:

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow'.'depends_on_past': False.'start_date': datetime(2015.6.1),
    'email': ['[email protected]'].'email_on_failure': False.'email_on_retry': False.'retries': 1.'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1) # t2 depends on T1
t3.set_upstream(t1)Copy the code

The purpose of the DAG script is to define the CONFIGURATION of the DAG and does not include any data processing. In this case, operator is task.

Instantiation of DAG

A DAG script consists of an instantiation of the DAG object and its corresponding operator. In addition, we can define default parameters to be provided to each task.

DAG object instantiation can provide the corresponding initialization parameters according to our needs, DAG object instantiation needs to provide a unique dag_ID:

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))Copy the code

Task instantiation

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)Copy the code

The definition of the task object is the instantiation of the operator. The operator has the task_id, which is used to distinguish tasks. The bash_command can be customized according to the needs, and parameters can also be passed.

The Task dependency

Tasks can establish dependencies on each other, such as:

t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')Copy the code

Airflow will automatically check the ring dependency to prevent any failure of the task. For more complex situations see the documentation.

Execute and test

Airflow: Create the DAG directory in the same directory as a.cfg to hold the first DAG script and run python tutorial.py. If no errors are reported, the tutorial has been created successfully.

Airflow’s command line

Airflow provides a number of columns of commands for viewing daGs and tasks

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --treeCopy the code

Execution of a test task

Executing a task is as simple as specifying the DAG and specifying the task and the date of execution

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01Copy the code

The test command executes the task and prints it to the console. It does not persist the execution status of the task

Execute the task and record the status

The execution is called backfill in Airflow, where the backfill execution actually starts tracking the execution status and dependencies of the task and logs it

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07Copy the code

More on DAG and Operator

The scope of DAG

Airflow will default load any DAG object it can import to. This means that any DAG object that is global can be imported, but sometimes the SubDagOperator can use the local scope to make sure daGs are not imported.

dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
    dag_2 = DAG('but_this_dag_will_not')

my_function()Copy the code

DAG can specify default parameters

The default DAG arguments are applied to all operators.

default_args=dict(
    start_date=datetime(2016.1.1),
    owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # AirflowCopy the code

Highly extensible operator

Airflow’s Operator is easy to scale, which is why it supports almost any form of task. Although the Airflow support different task can transmit data, but if you really need to share data between the two task, is the best way to write them together.

The resources

  • airflow.incubator.apache.org/
  • Github.com/apache/incu…
  • Medium.com/airbnb-engi…
  • Airflow Tests Demo github.com/zhyq0826/ai…

For more original posts, follow wecatch