Writing in the front

After looking at the table of contents, readers may wonder why this tutorial doesn’t cover a framework such as scrapy or PySpider. Here, I think it’s more important to understand how crawlers work than to learn a framework. Crawlers are essentially HTTP requests, independent of language and framework.

In this section, we’ll develop a simple concurrent (or even distributed) crawler framework in 26 lines of code.

Module of crawler

First of all, let’s talk about several modules of crawler.

The task Producer — Producer

Define tasks, such as: What page to climb? How to parse

Downloader – Downloader

Download, accept the task of the task generator, after the completion of the download to the parser for parsing. Mainly I/O operations, limited by network speed.

The Parser — Parser

Parser, which parses the content downloaded by the downloader and passes it to the output pipeline. Mainly CPU operation, limited by the download speed of the downloader.

Output Pipeline

How to display the retrieved data, for example, we have been using print, which is actually a console line. Of course, you can also define FilePipeline, MysqlPipeline, Sqlite3Pipeline, etc.

  • ConsolePipeline: Outputs the desired content directly to the console.
  • FilePipeline: Output the desired content to a file to save, such as a JSON file.
  • MongoDBPipeline: Store desired content in MongoDB database.
  • And so on…

Structure of crawler frame

The four modules above make up four parts.

  • 1. First, there will be an initial task generator to produce the download task.
  • 2. The downloader continuously takes out tasks from the task queue and puts them into the web page pool after downloading them.
  • 3. Different parsers take out web pages for parsing and send them to corresponding output pipes. During that time, the parser also generates new download tasks and places them in the task queue.
  • 4. The output pipe stores and displays the parsed results.

Simple crawler framework architecture

In fact, we can also divide the crawler into less details, download + analysis + output can actually be classified as a Worker.

Just like the following, first of all, the initial task generator will generate a download task, and then the system will create several workers for the download task. The workers will download and parse the tasks and output them, and at the same time, generate new downloaded tasks according to some parsed links and put them into the task queue. And so on until there are no more tasks.

Interprocess communication

Next, let’s talk about interprocess communication.

Let’s take the example of a producer and consumer. Suppose there are two processes, one called producer and one called consumer. The producer is only responsible for producing tasks and putting them into a pool (task queue), and the consumer takes the tasks from the task queue and completes them (consuming them).

Our task Queue uses the Multiprocessing Queue, which guarantees the safety of multi-process operations.

from multiprocessing import Process, Queue
import time


def produce(q):  # production
    for i in range(10):
        print('Put %d to queue... ' % value)
        q.put(i)
        time.sleep(1)


def consume(q):  # consumption
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)


if __name__ == '__main__':
    q = Queue()
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    producer.start()
    consumer.start()

    producer.join()  Use Ctrl+C to exit the loop
    consumer.join()
Copy the code

Of course, you can try to have multiple producers, multiple consumers. The following creates two producers and consumers.

from multiprocessing import Process, Queue
import time


def produce(q):  # production
    for i in range(10000) :if i % 2= =0:
            print("Produce ", i)
            q.put(i)
            time.sleep(1)


def produce2(q):  # production
    for i in range(10000) :if i % 2= =1:
            print "Produce ", i
            q.put(i)
            time.sleep(1)


def consume(q):  # consumption
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 1, Get %s from queue.' % value


def consume2(q):  # consumption
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 2, Get %s from queue.' % value


if __name__ == '__main__':
    q = Queue(5)   A queue can hold up to 5 tasks, more than 5 will block
    producer = Process(target=produce, args=(q,))
    producer2 = Process(target=produce2, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    producer2.start()
    consumer.start()
    consumer2.start()

    producer.join()  Use Ctrl+C to exit the loop
    producer2.join()
    consumer.join()
    consumer2.join()
Copy the code

Here, the producer’s production time is two per second, and the consumer’s consumption time can be almost ignored, belonging to the “Wolf more meat less” series. When it runs, you can see that the console outputs two lines per second. Consumer1 and Consumer2 fight fiercely.

Consider the “more meat, less Wolf” case, as follows:

from multiprocessing import Process, Queue
import time


def produce(q):  # production
    for i in range(10000):
        print("Produce ", i)
        q.put(i)


def consume(q):  # consumption
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)
            time.sleep(1)


def consume2(q):  # consumption
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 2, Get %s from queue.' % value)
            time.sleep(1)


if __name__ == '__main__':
    q = Queue(5)    A queue can hold a maximum of 5 data points, and blocks if the number exceeds 5
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    consumer.start()
    consumer2.start()

    producer.join()  Use Ctrl+C to exit the loop
    consumer.join()
    consumer2.join()
Copy the code

Here the producer keeps producing until the queue is full. Two consumers consume one every second, and whenever a task is consumed, the producer immediately produces a new task, filling up the queue.

As explained above, the overall speed of the system is actually limited to the slowest speed. Like our crawler, the most time-consuming operation is the download, the overall crawling speed is also limited by the network speed.

The above producers and consumers are similar to Producer and Worker in crawlers. Producer acts as Producer, generates download tasks and puts them into the task queue. The Worker acts as a consumer and downloads, analyzes and data a certain webpage after receiving the download task. At the same time, workers will also act as producers, generate new download tasks according to the resolved links, and put them into the task queue for other workers to execute.

DIY Concurrency framework

Let’s take a look at our own concurrent crawler framework, which is very short at 26 lines of code, or 21 lines of code excluding empty lines.

from multiprocessing import Manager, Pool


class SimpleCrawler:
    def __init__(self, c_num):
        self.task_queue = Manager().Queue()  # task queue
        self.workers = {}                    # Worker, dictionary type, store different workers
        self.c_num = c_num                   # Number of concurrent processes

    def add_task(self, task):
        self.task_queue.put(task)

    def add_worker(self, identifier, worker):
        self.workers[identifier] = worker

    def start(self):
        pool = Pool(self.c_num)
        while True:
            task = self.task_queue.get(True)
            if task['id'] = ="NO":  # End crawler
                pool.close()
                pool.join()
                exit(0)
            else:  # Complete tasks for worker
                worker = self.workers[task['id']]
                pool.apply_async(worker, args=(self.task_queue, task))
Copy the code

There are four methods in this class: constructor method, add initial task method, set worker method, and start crawl method.

__init__ method:

In the constructor, we create a task Queue(note the use of manager.queue (), which uses the Manager class because we will use the process pool later), a workers dictionary, and the number of concurrent configurations.

crawler = SimpleCrawler(5)  The number of concurrent requests is 5
Copy the code

Add_task method:

Is responsible for adding initial task methods in the form of a dictionary. It has fields such as ID and URL. Ids are assigned to different workers. As follows:

crawler.add_task({
    "id": "worker"."url": "http://nladuo.cn/scce_site/"."page": 1
})
Copy the code

Add_worker method:

It is responsible for configuring worker and storing it in workers variable with ID as key, where worker can be defined as an abstract class or a function. So for the sake of simplicity, let’s just do a function.

def worker(queue, task):
    url = task["url"]
    resp = requests.get(url)
    #... , crawl parsed web pages
    queue.put(new_task) New tasks may also be added
    #...

crawler.add_worker("worker", worker)
Copy the code

The start method:

The start method starts the crawler, which creates a pool of processes for concurrency. Then the tasks are continuously removed from the queue and assigned to the worker with the corresponding ID according to the ID of the task. We stipulate that when id is “NO”, we stop the crawler.

crawler.start()
Copy the code

Climb two levels of the page

Next, let’s use this simple crawler framework to implement a two-level page crawler.

First look at the first level page: nladuo.cn/scce_site/. It’s actually the news list page from before. We can crawl to the headline of the news and the corresponding web page link.

The second level page is: nladuo.cn/scce_site/a… , which is the details page of the news, where you can get the content of the news and the number of clicks, etc.

Next we create two workers, one to crawl the list page and the other to crawl the news details page.

def worker(queue, task):
    """ Crawl the News list page """
    pass


def detail_worker(queue, task):
    """ "Crawl news Details page """
    pass
Copy the code

Main code

For the main code, you first need to create a crawler. Then add two workers with ids “worker” and “detail_worker” respectively. Then add an initial task, which is to crawl the front page of the news list page.

if __name__ == '__main__':
    crawler = SimpleCrawler(5)
    crawler.add_worker("worker", worker)
    crawler.add_worker("detail_worker", detail_worker)
    crawler.add_task({
        "id": "worker"."url": "http://nladuo.cn/scce_site/"."page": 1
    })
    crawler.start()
Copy the code

Worker coding

Next, complete our worker code, which takes two arguments: Queue and task.

  • Queue: Used to add new tasks after web pages are parsed
  • -Leonard: Task to be done

Then the worker firstly downloads the web page, secondly parses the web page, and then needs to climb the detail page according to the parsed list, so it needs to add the task of climbing the detail page; ④ Finally judge whether the current page is the last, if so, send the exit signal, otherwise add the next page of the news list crawling task.

def worker(queue, task):
    """ Crawl the News list page """
    # Download task
    url = task["url"] + "%d.html" % task["page"]
    print("downloading:", url)
    resp = requests.get(url)

    # Parse web pages
    soup = BeautifulSoup(resp.content, "html.parser")
    items = soup.find_all("div", {"class"."list_title"})

    for index, item in enumerate(items):
        detail_url = "http://nladuo.cn/scce_site/" + item.a['href']
        print("adding:", detail_url)
        # Add new task: crawl details page
        queue.put({
            "id": "detail_worker"."url": detail_url,
            "page": task["page"]."index": index,
            "title": item.get_text().replace("\n"."")})if task["page"] = =10:  # add end signal
        queue.put({"id": "NO"})
    else:
        # Add new task: Crawl next page
        queue.put({
            "id": "worker"."url": "http://nladuo.cn/scce_site/"."page": task["page"] +1
        })
Copy the code

Detail_worker code writing

Detail_worker’s task is relatively simple, just download the task, parse the web page and print it. So to make the screen output a little bit less messy, we’re just going to get clicks.

def detail_worker(queue, task):
    """ "Crawl news Details page """
    # Download task
    print("downloading:", task['url'])
    resp = requests.get(task['url'])
    # Parse web pages
    soup = BeautifulSoup(resp.content, "html.parser")
    click_num = soup.find("div", {"class"."artNum"}).get_text()
    print(task["page"], task["index"], task['title'], click_num)
Copy the code

thinking

Here, we use our own developed framework to achieve a multi-level page crawler. Readers may consider the following questions.

  • How to realize automatic end of crawler? Consider monitoring queue status and worker status.
  • How to implement a distributed crawler? Consider using distributed queues: celery