The blog was first published at www.litreily.top

Asked by a friend in the financial circle, I helped write a crawler to help him climb the information of practitioners of all financial institutions in the website of China futures industry agreement. The acquisition of website data itself is relatively simple, but in order to learn some new crawler methods and skills, namely the producer-consumer model described in this article, I also learned the use of Queue and Thread library in Python.

Producer-consumer model

The producer-consumer model, as most programmers know, is very simple: one party as a producer keeps providing resources and the other as a consumer keeps consuming resources. To put it simply, it is like the chef and the customer in a restaurant. The chef is the producer who keeps making delicious food, and the customer is the consumer who keeps eating the food provided by the chef. In addition, producers and consumers can have one-to-one, one-to-many, many-to-one and many-to-many relationships.

So what does this model have to do with crawlers? In fact, a crawler can be considered as a producer. It constantly crawls data from the website, and the data it crawls is food. And the obtained data needs to be cleaned by consumers, absorbing useful data and discarding useless data.

In practice, crawler and data cleaning correspond to one Thread respectively, and data is transmitted between the two threads through the sequential queue. The data transmission process is just like the process of a waiter sending food from the kitchen to the customer’s table in a restaurant. The crawl thread is responsible for crawling the website data and storing the original data in the queue. The cleaning thread reads the original data from the queue in the queue order and extracts the valid data.

That’s a brief introduction to the producer-consumer model, but the following details the crawl task.

Analysis of the site

http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfo

The data we want to climb is the employee information of all futures companies in the table displayed on the home page, and each company corresponds to an institution number (G01001~G01198). From the picture above, we can see that there are home pages and pagination, a total of 8 pages. Take G01001 Founder Intermediate Futures Company as an example, click the company name to jump to the corresponding webpage as follows:

The following information can be extracted from the url and web page content:

  1. The url
  • http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+&currentPage=1&pageSize=20&selectType=personinfo
    • organid: Organization Number,+G01001+ ~ +G01198+
    • currentPage: Current page number of the employee information of the organization
    • pageSize: Indicates the number of people displayed on each page. The default value is 20
    • selectTypeFixed for:personinfo
  1. Name of institutionmechanism_name, you can see the current organization name at the top of each page
  2. Practitioner information, the contents of each page of the table, is also what we want to crawl
  3. The total number of pages of employee information in the organizationpage_cnt

The data we finally climb can be stored in the corresponding TXT file or Excel file according to the organization name.

Get organization name

BeautifulSoup is used to quickly extract the name of an organisation after you have retrieved any of its practice information pages.

mechanism_name = soup.find(' ', {'class':'gst_title'}).find_all('a') [2].get_text()
Copy the code

Why, one might ask, do it again when the home page form already contains the numbers and names of all organizations? This is because I don’t want to crawl the home page tables at all. Instead, I simply generate the corresponding URL based on the increasing number of organizations, so the task of getting the organization name comes after crawling the first information page of each organization.

Get the number of pages corresponding to organization information

The amount of data varies from organization to organization, but fortunately each page contains both the current page count and the total page count. Use the following code to get the page number.

url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
Copy the code

After obtaining the page number from each organization’s home page, the for loop can modify currentPage in the URL parameter to obtain organization information page by page.

Get the current page practitioner information

For a particular information page like the one shown above, the people information is stored in a table and, except for the fixed header information, is contained in a TR tag with an ID, so it is easy to extract all the people information on the page using BeautifulSoup.

soup.find_all('tr', id=True)
Copy the code

Determine the crawl scheme

The general idea, of course, is to crawl the homepage page by page, then get links for all organizations on each page, and continue to crawl for each organization.

However, due to the obvious rules of the website, we can directly get the website of each information page of each institution according to the number of each institution. Therefore, the specific crawling scheme is as follows:

  1. Will allAgency numberUrl queueurl_queue
  2. Creating a new producer threadSpiderThreadComplete the capture mission
  • Loop slave queueurl_queueRead a number, generate organization home page URL, userequestsGrab the
  • Get the number of page numbers from the fetch result, if it is 0, return the first step of the thread
  • Cycle to climb the remaining pages of the current mechanism
  • Queue page informationhtml_queue
  1. Create a new consumer threadDatamineThreadThe data cleaning task is complete
  • Loop slave queuehtml_queueTo read a set of page information
  • useBeautifulSoupExtract practitioner information from the page
  • The information is stored as a two-dimensional array and is finally handed over to a data store classStorageSave to local file

Code implementation

producerSpiderThread

The crawler first obtains an organization number from the queue, generates the website of the homepage of the organization and crawls it, then determines whether the number of organization pages is 0. If not, the crawler continues to obtain the organization name, and crawls the remaining pages according to the number of pages. The original HTML data is stored in the following dict format:

{
    'name': mechanismId_mechanismName,
    'num': currentPage,
    'content': html
}
Copy the code

Html_queue, the data queue generated by the crawler, will be processed by the data cleaning thread. The following is the main program of the crawler thread. See the source code for the whole thread.

def run(self):
    while True:
        mechanism_id = 'G0' + self.url_queue.get()

        # the first page's url
        url = self.__get_url(mechanism_id, 1)
        html = self.grab(url)

        page_cnt = self.url_re.search(html.text).group(1)
        if page_cnt == '0':
            self.url_queue.task_done()
            continue
        
        soup = BeautifulSoup(html.text, 'html.parser')
        mechanism_name = soup.find(' ', {'class':'gst_title'}).find_all('a') [2].get_text()
        print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

        # put data into html_queue
        self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1.'content':html})
        for i in range(2, int(page_cnt) + 1):
            url = self.__get_url(mechanism_id, i)
            html = self.grab(url)
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
        
        self.url_queue.task_done()
Copy the code

consumersDatamineThread

The data cleaning thread is relatively simple, which is to extract HTML data one by one from the data queue html_queue provided by the producer, and then extract the employee information from the HTML data, store it in the form of two-dimensional array, and finally deliver it to the Storage module Storage to complete the data Storage.

class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos
        
    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()
Copy the code

Data is storedStorage

I wrote two types of file format storage functions, write_txt, write_excel, respectively corresponding to TXT,excel files. The file format is determined by the caller when it is actually stored.

def save(self, data):
    {
        '.txt': self.write_txt,
        '.xls': self.write_excel
    }.get(self.filetype)(data)
Copy the code

Save to TXT file

Save TXT file is relatively simple, is to open the file in the form of additional (a), write data, close the file. Where the file name is provided by the caller. When data is written, each person information occupies a line, separated by the TAB character \t.

def write_txt(self, data):
    '''Write data to txt file'''
    fid = open(self.path, 'a', encoding='utf-8')

    # insert the header of table
    if not os.path.getsize(self.path):
        fid.write('\t'.join(self.table_header) + '\n')
    
    for info in data:
        fid.write('\t'.join(info) + '\n')
    fid.close()
Copy the code

Save it in Excel file

It is cumbersome to store Excel files. Due to limited experience, XLWT, XLRD and Xlutils libraries are selected. To be honest, these 3 libraries are really not very useful and just barely get the job done. Why? Here’s why:

  1. Modify file trouble:xlwtCan only write,xlrdCan only read, needxlutilsthecopyFunction willxlrdThe read data is copied to memory and reusedxlwtModify the
  2. Only support.xlsFile:.xlsxReading and writing will also become.xlsformat
  3. Table styles are mutable: Whenever a file is rewritten, the table style must be reset

So I’m sure I’ll be learning more about other Excel libraries in the future, and of course, the current solution will still use these three. The code is as follows:

def write_excel(self, data):
    '''write data to excel file'''
    if not os.path.exists(self.path):
        header_style = xlwt.easyxf('FONT :name Regular script, color-index black, bold on')
        wb = xlwt.Workbook(encoding='utf-8')
        ws = wb.add_sheet('Data')

        # insert the header of table
        for i in range(len(self.table_header)):
            ws.write(0, i, self.table_header[i], header_style)
    else:
        rb = open_workbook(self.path)
        wb = copy(rb)
        ws = wb.get_sheet(0)
    
    # write data
    offset = len(ws.rows)
    for i in range(0, len(data)):
        for j in range(0, len(data[0])):
            ws.write(offset + i, j, data[i][j])

    # When use xlutils.copy.copy function to copy data from exist .xls file,
    # it will loss the origin style, so we need overwrite the width of column,
    # maybe there some other good solution, but I have not found yet.
    for i in range(len(self.table_header)):
        ws.col(i).width = 256 * (10.10.15.20.50.20.15)[i]

    # save to file
    while True:
        try:
            wb.save(self.path)
            break
        except PermissionError as e:
            print('{0} error: {1}'.format(self.path, e.strerror))
            time.sleep(5)
        finally:
            pass
Copy the code

Description:

  1. A file corresponds to the data of an organization, which needs to be read and written multiple times. Therefore, the offset of the number of lines when the file is written needs to be calculatedoffset, that is, the number of lines of data contained in the current file
  2. Occurs when a written file is opened manuallyPermissionErrorException, you can catch the exception and then prompt an error message, and periodically wait until the file is closed.

main

The main function is used to create and start producer and consumer threads, as well as to provide organization number queues for producer threads.

url_queue = queue.Queue()
html_queue = queue.Queue()

def main(a):
    for i in range(1001.1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()
Copy the code

As you can see from the main function, both queues call the join function, which blocks until the corresponding queue is empty. Note that queue.get() requires a queue.task_done() operation for each queue.get() operation, otherwise the main thread is still executing when all queue data has been processed.

At this point, the main code of the crawler will be explained, the following is the complete source code.

The source code

#! /usr/bin/python3
# -*-coding:utf-8-*-

import queue
from threading import Thread

import requests

import re
from bs4 import BeautifulSoup

import os
import platform

import xlwt
from xlrd import open_workbook
from xlutils.copy import copy

import time

# url format left
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+&currentPage=1&pageSize=20&selectType=personinfo &all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
# 
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}

url_queue = queue.Queue()
html_queue = queue.Queue()


class SpiderThread(Thread):
    """Threaded Url Grab"""
    def __init__(self, url_queue, html_queue):
        Thread.__init__(self)
        self.url_queue = url_queue
        self.html_queue = html_queue
        self.page_size = 20
        self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
        self.headers = {'User-Agent': 'the Mozilla / 5.0 (Windows NT 10.0; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}

    def __get_url(self, mechanism_id, current_page):
        return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+&currentPage=%d&pageSize=%d&selectType=personinfo&a ll=undefined' \
        % (mechanism_id, current_page, self.page_size)

    def grab(self, url):
        '''Grab html of url from web'''
        while True:
            try:
                html = requests.get(url, headers=self.headers, timeout=20)
                if html.status_code == 200:
                    break
            except requests.exceptions.ConnectionError as e:
                print(url + ' Connection error, try again... ')
            except requests.exceptions.ReadTimeout as e:
                print(url + ' Read timeout, try again... ')
            except Exception as e:
                print(str(e))
            finally:
                pass
        return html
    
    def run(self):
        '''Grab all htmls of mechanism one by one Steps: 1. grab first page of each mechanism from url_queue 2. get number of pages and mechanism name from first page 3. grab all html file of each mechanism 4. push all html to html_queue '''
        while True:
            mechanism_id = 'G0' + self.url_queue.get()

            # the first page's url
            url = self.__get_url(mechanism_id, 1)
            html = self.grab(url)

            page_cnt = self.url_re.search(html.text).group(1)
            if page_cnt == '0':
                self.url_queue.task_done()
                continue
            
            soup = BeautifulSoup(html.text, 'html.parser')
            mechanism_name = soup.find(' ', {'class':'gst_title'}).find_all('a') [2].get_text()
            print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

            # put data into html_queue
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1.'content':html})
            for i in range(2, int(page_cnt) + 1):
                url = self.__get_url(mechanism_id, i)
                html = self.grab(url)
                self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
            
            self.url_queue.task_done()
    

class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos
        
    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()


class Storage(a):
    def __init__(self, filename, filetype):
        self.filetype = filetype
        self.filename = filename + filetype
        self.table_header = ('name'.'gender'.'Qualification Number'.'Investment Consulting Practice Certificate No.'.'Department'.'job'.'Time in current post')
        self.path = self.__get_path()

    def __get_path(self):
        path = {
            'Windows': 'D:/litreily/Documents/python/cfachina'.'Linux': '/mnt/d/litreily/Documents/python/cfachina'
        }.get(platform.system())

        if not os.path.isdir(path):
            os.makedirs(path)
        return '%s/%s' % (path, self.filename)
    
    def write_txt(self, data):
        '''Write data to txt file'''
        fid = open(self.path, 'a', encoding='utf-8')

        # insert the header of table
        if not os.path.getsize(self.path):
            fid.write('\t'.join(self.table_header) + '\n')
        
        for info in data:
            fid.write('\t'.join(info) + '\n')
        fid.close()
    
    def write_excel(self, data):
        '''write data to excel file'''
        if not os.path.exists(self.path):
            header_style = xlwt.easyxf('FONT :name Regular script, color-index black, bold on')
            wb = xlwt.Workbook(encoding='utf-8')
            ws = wb.add_sheet('Data')

            # insert the header of table
            for i in range(len(self.table_header)):
                ws.write(0, i, self.table_header[i], header_style)
        else:
            rb = open_workbook(self.path)
            wb = copy(rb)
            ws = wb.get_sheet(0)
        
        # write data
        offset = len(ws.rows)
        for i in range(0, len(data)):
            for j in range(0, len(data[0])):
                ws.write(offset + i, j, data[i][j])

        # When use xlutils.copy.copy function to copy data from exist .xls file,
        # it will loss the origin style, so we need overwrite the width of column,
        # maybe there some other good solution, but I have not found yet.
        for i in range(len(self.table_header)):
            ws.col(i).width = 256 * (10.10.15.20.50.20.15)[i]

        # save to file
        while True:
            try:
                wb.save(self.path)
                break
            except PermissionError as e:
                print('{0} error: {1}'.format(self.path, e.strerror))
                time.sleep(5)
            finally:
                pass
    
    def save(self, data):
        '''Write data to local file. According filetype to choose function to save data, filetype can be '.txt' or '.xls', but '.txt' type is saved more faster then '.xls' type Args: data: a 2d-list array that need be save '''
        {
            '.txt': self.write_txt,
            '.xls': self.write_excel
        }.get(self.filetype)(data)


def main(a):
    for i in range(1001.1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()


if __name__ == '__main__':
    main()
Copy the code

Crawl test

Write in the last

  • The test found that writetxtIs significantly faster than writingexcelThe speed of the
  • If the page urlpageSizeModified to1000Or larger, it is possible to obtain information about all employees in an organization at once, rather than crawling through pages, which can greatly improve efficiency.
  • The crawler is hosted at Github Python-Demos