This is the 19th day of my participation in the August More Text Challenge

Introduction to the

Airflow is a platform for programmatically creating, scheduling, and monitoring workflow.

Create the workflow as a directed Acyclic graph (DAG) task using Airflow. The Airflow scheduler executes your task on a set of workers based on the dependency you specify. Also, Airflow’s extensive command-line utilities make complex diagnostics on the DAG a no-hassle experience. It also provides a rich user interface that makes it easy to visualize workflow running in production, monitor progress and troubleshoot problems.

When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.

The main features

  • Dynamic: The Airflow workflow is configured in Code (Python) to allow dynamic workflow (DAG) generation. And allows you to write code that dynamically instantiates the workflow.
  • Extensible: Easily define your own Operators, Executors, and expand the library to meet the level of abstraction that meets your environment.
  • Elegant: The design is simple and elegant. Parameterized scripts are built into the Airflow core using the powerful Jinja template engine.
  • Scalable: Airflow has a modular architecture and uses message queues to orchestrate any number of workers. Airflow is created for infinite expansion.

architecture

Airflow will normally consist of:

  • A scheduler: It handles triggering workflow scheduling and submits tasks to the executor for execution.
  • An executor: it handles running tasks. In the default Airflow installation, it runs everything in the scheduler, but most production-appropriate actuators actually push task execution to workers.
  • A WEB server: It provides a convenient user interface to check, trigger, and debug the health of daGs and tasks.
  • A folder containing DAG files: read by the scheduler and the executor (and any workers owned by the executor)
  • A metadata database: used by schedulers, actuators, and WEB servers to store state.

Airflow installation and initialization

Install the Airflow

# AirFlow requires a HOME directory, default is ~/ AirFlow directory, you can also set it elsewhere
export AIRFLOW_HOME=~/airflow

Install the dependency libraryAIRFLOW_VERSION = 2.1.2 PYTHON_VERSION ="$(python --version | cut -d "" -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.6.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Copy the code

Initialize the database and create users

Initialize the database
airflow db init

Create a user and password
airflow users create \
 --username admin \
 --firstname Peter \
 --lastname Parker \
 --role Admin \
 --email [email protected]
Copy the code

Start the WEB service and scheduler

The default port is 8080
airflow webserver --port 8080

Start the scheduler
airflow scheduler

# Open 10.247.128.69:8080 in your browser and open example dag on the home page
Copy the code

Run official Demo

Run the first task instance
# run your first task instance
airflow tasks run example_bash_operator runme_0 2015-01-01

# Run two days of task backfill
# run a backfill over 2 days
airflow dags backfill example_bash_operator \
 --start-date 2015-01-01 \
 --end-date 2015-01-02
Copy the code

The sample

Defining workflow

~/airflow/dags/tutorial.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

Define default parameters
These arguments are passed to each operator
You can override them on a per-task basis during operator initialization
default_args = {
 'owner': 'airflow'.'depends_on_past': False.'email': ['[email protected]'].'email_on_failure': False.'email_on_retry': False.'retries': 1.'retry_delay': timedelta(minutes=5),
 # 'queue': 'bash_queue',
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
 # 'wait_for_downstream': False,
 # 'dag': dag,
 # 'sla': timedelta(hours=2),
 # 'execution_timeout': timedelta(seconds=300),
 # 'on_failure_callback': some_function,
 # 'on_success_callback': some_other_function,
 # 'on_retry_callback': another_function,
 # 'sla_miss_callback': yet_another_function,
 # 'trigger_rule': 'all_success'
}


Instantiate a DAG
We need a DAG object to nest our tasks.
# Here, we pass a string defining dag_id, which is used as the unique identifier for DAG.
# We also pass the default parameter dictionary we just defined,
Schedule_interval = 1 day
with DAG(
 'tutorial',
 default_args=default_args,
 description='A simple tutorial DAG',
 schedule_interval=timedelta(days=1),
 start_date=days_ago(2),
 tags=['example'],)as dag:
 The task will be generated when the Operator object is instantiated.
 The object instantiated from Operator is called a task. The first parameter, task_id, acts as a unique identifier for the task.
 # t1, t2 and t3 are examples of tasks created by instantiating operators
 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
 )

 t2 = BashOperator(
     task_id='sleep',
     depends_on_past=False,
     bash_command='sleep 5',
     retries=3.)# Add workflow and task issues

 t1.doc_md = dedent(
     """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ! [img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """
 )

 dag.doc_md = __doc__  # assuming you have a docstring at the start of the DAG
 dag.doc_md = """ This is a documentation placed anywhere """     # Otherwise, type it like this

 # Jinja template
 templated_command = dedent(
     """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """
 )
 t3 = BashOperator(
     task_id='templated',
     depends_on_past=False,
     bash_command=templated_command,
     params={'my_param': 'Parameter I passed in'},)Set task dependencies
 t1 >> [t2, t3]
Copy the code

Note: While executing your script, Airflow will throw an exception if loop or multiple reference dependencies are found in your DAG.

test

Run the script

First, let’s make sure the workflow is successfully parsed.

 python ~/airflow/dags/tutorial.py
Copy the code

If the script does not throw an exception, you have not made any serious errors and your Airflow environment appears to be sound.

Command line metadata validation

# initialize the database tables
airflow db init

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
Copy the code

Test tasks and Jinja template tasks

Let’s test this by running an actual task instance on a specific date. In this context, the date is specified by a field called execution_date. This is the logical date, which simulates the scheduler running your task or DAG on a specific date and time so that it actually runs now (or when its dependencies are satisfied).

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01
Copy the code
# testing templated
Display a detailed event log, and finally your bash command line runs and prints the results
airflow tasks test tutorial templated 2015-06-01
Copy the code

Note: The airflow task test command runs the task instances locally, logs them to standard output (on screen), does not affect dependencies, and does not change the state (run, success, fail…) Pass to the database. It simply tests a single task instance.

This also applies to market flow dags test [dag_id] [execution_date] on DAG level. It performs a DAG run on the given DAG ID. While it does account for task dependencies, it does not register state in the database. With this in mind, it is convenient to test the full run of the DAG locally. If a task in your DAG requires data for a location, that data is available.

backfill

Backfill will send the log to a file and interact with the database to record the status based on your dependencies.

If you have a Web service, you will be able to track progress. If you are interested in visually tracking progress during backfill, Airflow WebServer will launch a Web service.

Optionally, start a WEB service in Debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
 --start-date 2015-06-01 \
 --end-date 2015-06-07
Copy the code

Note:

If you use DependS_on_PAST =True, a single task instance will depend on the success of its previous task instance (that is, according to the previous execution_date). Task instances with execution_date==start_date will ignore this dependency because no past task instances will be created for them.

You may also need to consider wait_for_downstream=True when using dependS_on_past =True. While dependS_on_PAST =True causes a task instance to depend on the success of its previous task instance, wait_FOR_downstream =True causes the task instance to also wait for all task instances downstream of the previous task instance to succeed.

conclusion

Apache Airflow allows tasks of one workflow to be executed simultaneously on multiple workers. The task dependence relationship is constructed by directed acyclic graph. In addition, each task in a workflow is atomically retried. If a task fails in a workflow, it can be automatically or manually retried without starting the task from the beginning.

Overall, Apache Airflow is both the most popular and the most extensive workflow tool available.