• Python & Big Data: Airflow & Jupyter Notebook with Hadoop 3, Spark & Presto
  • By Mark Litwintschik
  • The Nuggets translation Project
  • Permanent link to this article: github.com/xitu/gold-m…
  • Translator: cf020031308
  • Proofreader: Yqian1991

In recent years, Python has become a popular programming language in data science, machine learning, and deep learning. You just need to pair it with the query language SQL to do most of the work. SQL is great because you can issue instructions in English, and you just tell me what you want, not how to query it. This allows the underlying query engine to optimize SQL queries without changing them. Python is also great. It has a large number of high-quality libraries and is easy to use.

Job choreography is the act of performing daily tasks and automating them. In the past, this was usually done through CRON jobs. In recent years, more companies have used Apache Airflow and Spotify’s Luigi to create more powerful systems. These tools can monitor jobs, record results, and re-run jobs in the event of a failure. If you are interested, I have written a blog post that includes an Airflow backstory entitled Building a Data Pipeline using Airflow.

Notebooks as a data exploration and visualization tool have also become very popular in the data realm over the years. Tools like Jupyter Notebook and Apache Zeppelin aim to meet this need. Notebooks not only shows you the results of the analysis, but also the code and queries that produced them. This facilitates spotting omissions and helps analysts reproduce each other’s work.

Airflow and Jupyter Notebook work well together and allow you to automatically enter new data into the database using Airflow, which can then be analyzed by data scientists using Jupyter Notebook.

In this blog post, I’ll install a single-node Hadoop, get Jupyter Notebook running and show you how to create an Airflow job that takes a weather data source, stores it on HDFS, converts it to ORC, Finally, it is exported to a Microsoft Excel spreadsheet.

The machine I’m using has a 3.40ghz Intel Core I5-4670K CPU, 12GB of RAM, and 200GB of SSD. I will use the newly installed Ubuntu 16.04.2 LTS and build and install single-node Hadoop according to the instructions in my blog post Hadoop 3: Single-node Installation Guide.

Install dependencies

Next you will install the dependencies on Ubuntu. The Git package will be used to get the weather data set from GitHub, and the other three packages are Python itself, the Python package installer, and the Python Environment Isolation toolkit.

$ sudo apt install \
    git \
    python \
    python-pip \
    virtualenv
Copy the code

Airflow will rely on RabbitMQ’s assistance in tracking its operations. Install Erlang, the language for writing RabbitMQ.

$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
    sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang
Copy the code

Install RabbitMQ.

$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server
Copy the code

The dependencies and applications on Python used in this blog post will be installed below.

$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
    apache-airflow \
    celery \
    cryptography \
    jupyter \
    jupyterthemes \
    pyhive \
    requests \
    xlsxwriter
Copy the code

Configuration Jupyter Notebook

I will create a folder for Jupyter to store its configuration and then set the password for the server. If you do not set a password, you will get a lengthy URL containing the key to access the Jupyter web interface. The key is updated each time the Jupyter Notebook is started.

$ mkdir -p ~/.jupyter/
$ jupyter notebook password
Copy the code

Jupyter Notebook supports user interface themes. The following command sets the theme to Chesterish.

$ jt -t chesterish
Copy the code

The following command lists the topics currently installed. The built-in themes all have screenshots on GitHub.

$ jt -l
Copy the code

To return to the default theme, run the following command.

$ jt -r
Copy the code

Use the Jupyter Notebook to query Spark

Start by ensuring that you are running Hive’s Metastore, Spark’s Master & Slaves service, and Presto’s server. Here are the commands to start these services.

$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh
Copy the code

The Jupyter Notebook will now be launched so that you can interact with PySpark, Spark’s Python-based programming interface.

$ PYSPARK_DRIVER_PYTHON=ipython \
    PYSPARK_DRIVER_PYTHON_OPTS="Notebook - no - browser - IP = 0.0.0.0 -- NotebookApp. Iopub_data_rate_limit = 100000000" \
    pyspark \
    --master spark://ubuntu:7077
Copy the code

Note that the URL for the master above uses Ubuntu as the host name. The host name is bound to the Spark Master server. If you cannot connect to Spark, check the log of the Spark Master server to find the host name that it has selected to bind to, because it does not accept connections that address other host names. This can be confusing, because you would normally expect a host name like localhost to work no matter what.

After running the Jupyter Notebook service, run the following command to open the web page.

$ open http://localhost:8888/
Copy the code

You will be prompted for the password you have set for Jupyter Notebook. After entering in the upper right corner, you can create a new notebook from the drop-down list. The two notebook types we are interested in are Python and terminals. The terminal Notebook gives you shell access using the UNIX account with which you started Jupyter Notebook. I’ll be using a Python notebook.

After starting the Python notebook, paste the following code into the cell, which will query the data through Spark. Adjust the query to use the data set that you created in the installation.

cab_types = sqlContext.sql(""" SELECT cab_type, COUNT(*) FROM trips_orc GROUP BY cab_type """)

cab_types.take(2)
Copy the code

This is the output of the above query. Only one record is returned, consisting of two fields.

[Row(cab_type=u'yellow', count(1)=20000000)]
Copy the code

Query the Presto from Jupyter Notebook

In the notebook used to query Spark, you can also query Presto. Some Presto queries perform better than Spark, but the two can be switched on the same laptop. In the example below, I use Dropbox’s PyHive library to query Presto.

from pyhive import presto

cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()
Copy the code

This is part of the output of the above query.

[(451221840,
  u'CMT',
  u'the 2011-08-23 21:03:34. 000',
  u'the 2011-08-23 21:21:49. 000',
  u'N', 1, -74.004655, 40.742162, -73.973489, 40.792922,...Copy the code

If you want to generate data diagrams in Jupyter Notebook, check out the blog post visualizing Data with SQLite in Jupyter Notebook, as it has several examples of drawing using SQL that can be used with Spark and Presto.

Start the Airflow

We will create a ~/airflow folder, set up an SQLite 3 database for storing the airflow state and configuration set set on the web interface, upgrade configuration mode and create a folder for the Python job code that airflow will run.

$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags
Copy the code

By default, Presto, Spark, and Airflow’s web interfaces all use TCP 8080. If you start Spark first, Presto will not start. But if you start Spark after Presto, Presto will start on 8080, and the Spark Master server will use 8081, and if it is still occupied, it will continue to try higher ports until it finds a free port. After that, Spark will choose a higher port number for the Spark Worker’s web interface. This overlap is usually not a problem because these services often exist on different machines in a production setup.

As TCP port 8080-8082 is used for this installation, I will launch the Airflow page interface on port 8083.

$ airflow webserver --port=8083 &
Copy the code

I often use one of the following commands to see which network port is in use.

$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu
Copy the code

The default storage for Airflow’s Celery agent and job results is MySQL. Use RabbitMQ instead.

$ vi ~/airflow/airflow.cfg
Copy the code

Find and edit the following Settings.

broker_url = amqp://airflow:airflow@localhost:5672/airflow

celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow
Copy the code

I have used airflow as username and password to connect to RabbitMQ. The account password can be customized at will.

The above account password will be configured for RabbitMQ so that it can access the Airflow virtual host.

$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ". *" ". *" ". *"
Copy the code

Connect Airflow to Presto

The Airflow page will now open.

$ open http://localhost:8083/
Copy the code

Once you have opened the Airflow page, click the Admin navigation menu at the top and then select Connections. You will see a long list of default database connections. Click to edit the Presto connection. Airflow connection to Presto requires the following changes.

  • Change the schema from Hive to default.
  • Change the port number from 3400 to 8080.

Save these changes, then click the “Data Profiling” navigation menu at the top and select “Ad Hoc Query”. Select “presto_default” from the drop-down list at the top of the query box, and you should be able to execute SQL code through Presto. Here is a sample query running against the dataset I imported in the installation.

SELECT count(*)
FROM trips_orc;
Copy the code

Download weather data sets

Airflow DAG may be deemed to be a timed operation. In the following example, I’ll take the weather data provided by the FiveThirtyEight data Warehouse on GitHub, import it into HDFS, convert it from CSV to ORC, and export it from Presto to Microsoft Excel format.

The following clones the FiveThirtyEight data store into a local folder named Data.

$ git clone \
    https://github.com/fivethirtyeight/data.git \
    ~/data
Copy the code

I will then start Hive and create two tables. One saves the dataset in CSV format, the other in Presto and Spark friendly ORC format.

$ hive
Copy the code
CREATE EXTERNAL TABLE weather_csv ( date_ DATE, actual_mean_temp SMALLINT, actual_min_temp SMALLINT, actual_max_temp SMALLINT, average_min_temp SMALLINT, average_max_temp SMALLINT, record_min_temp SMALLINT, Record_max_temp SMALLINT, record_min_temp_year INT, record_max_temp_year INT, actual_precipitation DECIMAL(18,14), Average_precipitation DECIMAL(18,14), record_precipitation DECIMAL(18,14)) ROW FORMAT DELIMITED FIELDS TERMINATED BY', '
  LOCATION '/weather_csv/';

CREATE EXTERNAL TABLE weather_orc (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DOUBLE,
    average_precipitation DOUBLE,
    record_precipitation  DOUBLE
) STORED AS orc
  LOCATION '/weather_orc/';
Copy the code

Create Airflow DAG

The Python code below is the Airflow job (also called DAG). Every 30 minutes, it does the following.

  • Clear any existing data in the /weather_csv/ folder on HDFS.
  • Copy the CSV file in the ~/data folder to the /weather_csv/ folder on the HDFS.
  • Convert CSV data in HDFS to ORC format using Hive.
  • Use Presto to export ORC data to Microsoft Excel 2013 format.

There is a point to CSV in the Python code below, where the full path is /home/mark-data/us-weather-history /*.csv. Replace the ‘mark’ with your own UNIX user name.

$ vi ~/airflow/dags/weather.py
Copy the code
from datetime import timedelta

import airflow
from   airflow.hooks.presto_hook         import PrestoHook
from   airflow.operators.bash_operator   import BashOperator
from   airflow.operators.python_operator import PythonOperator
import numpy  as np
import pandas as pd


default_args = {
    'owner':            'airflow'.'depends_on_past':  False,
    'start_date':       airflow.utils.dates.days_ago(0),
    'email':            ['[email protected]'].'email_on_failure': True,
    'email_on_retry':   False,
    'retries': 3.'retry_delay':      timedelta(minutes=15),
}

dag = airflow.DAG('weather',
                  default_args=default_args,
                  description='Copy the weather data to HDFS and export it to Excel',
                  schedule_interval=timedelta(minutes=30))

cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
                                bash_command=cmd,
                                dag=dag)

cmd = """hdfs dfs -copyFromLocal \ /home/mark/data/us-weather-history/*.csv \ /weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
                                bash_command=cmd,
                                dag=dag)

cmd = """echo \"INSERT INTO weather_orc SELECT * FROM weather_csv; \" | \ hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
                               bash_command=cmd,
                               dag=dag)


def presto_to_excel(**context):
    column_names = [
        "date"."actual_mean_temp"."actual_min_temp"."actual_max_temp"."average_min_temp"."average_max_temp"."record_min_temp"."record_max_temp"."record_min_temp_year"."record_max_temp_year"."actual_precipitation"."average_precipitation"."record_precipitation"
    ]

    sql = """SELECT * FROM weather_orc LIMIT 20"""

    ph = PrestoHook(catalog='hive',
                    schema='default',
                    port=8080)
    data = ph.get_records(sql)

    df = pd.DataFrame(np.array(data).reshape(20, 13),
                      columns=column_names)

    writer = pd.ExcelWriter('weather.xlsx',
                            engine='xlsxwriter')
    df.to_excel(writer, sheet_name='Sheet1')
    writer.save()

    return True

presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
                                      provide_context=True,
                                      python_callable=presto_to_excel,
                                      dag=dag)

remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task

if __name__ == "__main__":
    dag.cli()
Copy the code

Use this code to open the Airflow page screen and switch to “on” next to “Weather” DAG at the bottom of the home page.

The scheduler creates a list of jobs for workers to execute. The following will launch the Airflow scheduler service and a worker that will complete all scheduled jobs.

$ airflow scheduler &
$ airflow worker &
Copy the code

Thank you for taking time to read this article. I provide consulting, architectural and hands-on development services to clients in North America and Europe. If you are interested in exploring how my product can help your business, please contact me via LinkedIn.

If you find any mistakes in your translation or other areas that need to be improved, you are welcome to the Nuggets Translation Program to revise and PR your translation, and you can also get the corresponding reward points. The permanent link to this article at the beginning of this article is the MarkDown link to this article on GitHub.


The Nuggets Translation Project is a community that translates quality Internet technical articles from English sharing articles on nuggets. The content covers Android, iOS, front-end, back-end, blockchain, products, design, artificial intelligence and other fields. If you want to see more high-quality translation, please continue to pay attention to the Translation plan of Digging Gold, the official Weibo, Zhihu column.