Python asynchronous crawler advanced

Crawlers are IO-intensive tasks. For example, if we use the Requests library to crawl a site, after making a request, the program must wait for the site to return a response before continuing to run. While waiting for the response, the entire crawler is waiting without actually doing anything.

Therefore, it is necessary to improve the efficiency of the program, asynchronous is one of the effective methods.

Today we are going to learn about asynchronous crawlers.

I. Basic concepts

  • blocking

Blocked state the state in which a program is suspended if it does not get the computing resources it needs. A program is said to be blocked on an operation if it cannot continue to do anything else while waiting for that operation to complete. The common blocking modes are network I/O blocking, disk I/O blocking, and user input blocking. Blocking happens everywhere, including when the CPU switches context and all the processes can’t really do anything, they also block. If the CPU is multi-core, the core that is performing the context switch cannot be utilized.

  • non-blocking

If a program is not blocked while waiting for an operation and can continue to process other things, the program is said to be non-blocking on that operation. Nonblocking does not exist at any program level and under any circumstances. A program can be nonblocking only if the level of encapsulation can include individual subroutine units. Non-blocking exists because blocking exists, and it is precisely because of the time and inefficiency caused by an operation blocking that we make it non-blocking.

  • synchronous

In order to complete a task, different program units need to rely on some communication in the execution process to coordinate, we say that these program units are executed synchronously. For example, to update the inventory of goods in the shopping system, it is necessary to use “row lock” as a communication signal, so that different update requests are forced to queue the order of execution, and the operation of updating inventory is synchronous. In short, synchronization means order.

  • asynchronous

The way in which different program units can accomplish a task without communication and coordination during the process. Unrelated program units can be asynchronous. For example, crawlers download web pages. After the scheduler calls the download, it can schedule other tasks without having to communicate with the download task to coordinate behavior. Different web page download, save and other operations are irrelevant, there is no need to notify each other coordination. The completion time of these asynchronous operations is uncertain. In short, asynchrony means disorder.

  • Multiple processes

Multi-process is the use of CPU multi-core advantage, at the same time to execute multiple tasks in parallel, can greatly improve the efficiency of execution.

  • coroutines

Coroutine, also known as micro thread, fiber, Coroutine is a user mode of lightweight threads. Coroutines have their own register context and stack. When the coroutine schedule switches, it saves the register context and stack elsewhere, and when it switches back, it restores the previously saved register context and stack. Thus, the coroutine can retain the state of the last call, that is, a specific combination of all local states, and each time the procedure reenters, it enters the state of the last call. Coroutines are essentially single processes. Coroutines do not require the cost of thread context switching, atomic operation locking and synchronization, and the programming model is very simple compared to multiple processes. We can use it to implement asynchronous operations, such as the web crawler scenarios, we send a request, needs to wait for some time to get a response, but in the process of the waiting, the program can do many other things, wait until after the response is to switch back to continue processing, so that we can make full use of the CPU and other resources, That’s the advantage of coroutines.

Two, coroutine usage

Since Python 3.4, coroutines have been added to Python, but this version of coroutines is based on generator objects. In Python 3.5, async/await is added to make coroutines easier to implement.

asyncio

The most common library in Python that uses coroutines is Asyncio

  • Event_loop: Event loop, equivalent to an infinite loop, we can register some functions on the event loop, when the condition occurs, the corresponding processing method will be called.
  • Coroutine: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object: Coroutine object We can use the async keyword to define a method that is not immediately executed when called, but instead returns a coroutine object.
  • Task: A further encapsulation of the coroutine object, containing the various states of the task.
  • Future: Represents the result of a task that will be performed or not performed in the future, and is actually indistinguishable from task.

The async/await keyword, which appears only in Python 3.5, is used specifically to define coroutines. Where async defines a coroutine and await is used to suspend the execution of a blocking method.

Define coroutines

Define a coroutine to see how it differs from a normal process implementation, as follows:

import asyncio async def execute(x): Print ('Number: ', x) coroutine = execute(666) print(' coroutine: ') ', coroutine) print('After calling execute') loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')Copy the code

The running results are as follows:

Coroutine: < Coroutine object execute at 0x0000027808F5BE48> After Calling execute Number:  666 After calling loop Process finished with exit code 0Copy the code

We first import the asyncio package so that we can use async and await, and then we define an execute method using async. The method takes a number as an argument and prints the number when the method executes.

We then call the method directly, but instead of executing, the method returns a Coroutine object. We then create an event loop loop using the get_event_loop method, register the coroutine in the event loop loop by calling the run_until_complete method of the loop object, and start it. Finally, we see that the execute method prints the output.

As you can see, async defines a method that becomes a Coroutine object that cannot be executed directly and must be registered in the event loop to execute.

Task is a further encapsulation of coroutine objects. It contains more running states, such as running, finished, and so on. We can use these states to obtain the execution status of coroutine objects. In the example above, when we pass the coroutine to the run_until_complete method, what it does is encapsulate the Coroutine into a task. Task can also be declared explicitly, as shown below:

import asyncio async def execute(x): Print ('Number: ', x) return x coroutine = execute(666) print(' coroutine: ', x) ', coroutine) print('After calling execute') loop = asyncio.get_event_loop() task = loop.create_task(coroutine) Print ('Task: ', Task) loop. Run_until_complete (Task) print('Task: ', Task) print('After calling loop')Copy the code

The result is as follows

Coroutine: < Coroutine object execute at 0x000001CB3F90BE48> After Calling Execute Task: < Task pending coro = < the execute () running at D: / python/pycharm2020 / program/test_003 py: 3 > > Number: 666 Task:  <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666> After calling loop Process finished with exit code 0Copy the code

Here we define the loop object, then we call its create_task method to turn the Coroutine object into a task object, then we print it out, and find that it is pending. We add the task to the event loop for execution, and print out the task. We find that the task state is changed to finished, and the result is changed to 666, which is the result returned by the execute method we defined.

Another common way to define a task is directly through asyncio’s ensure_Future method. The result is also a task, so we can define it without resorting to loop. A task can be defined in advance even if loop has not been declared:

import asyncio async def execute(x): Print ('Number: ', x) return x coroutine = execute(666) print(' coroutine: ', x) ', coroutine) print('After calling execute') task = asyncio.ensure_future(coroutine) print(' task: ', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print(' task: ', task) print('After calling loop')Copy the code

The running effect is as follows:

Coroutine: < Coroutine object execute at 0x0000019794EBBE48> After Calling Execute Task: < Task pending coro = < the execute () running at D: / python/pycharm2020 / program/test_003 py: 3 > > Number: 666 Task:  <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666> After calling loop Process finished with exit code 0Copy the code

They all work the same

The binding callback operation for the Task object

You can bind a callback method to a task, as shown in the following example:

import asyncio import requests async def call_on(): status = requests.get('https://www.baidu.com') return status def call_back(task): Print (' Status: ', task.result()) corountine = call_on() task = asyncio.ensure_future(corountine) task.add_done_callback(call_back) Print ('Task: ', Task) loop = asyncio.get_event_loop() loop.run_until_complete(Task) print('Task: ', Task)Copy the code

We define a call_on method that requests Baidu for its status code, but we don’t have any print statements in this method. We then define a call_back method that takes a task as an argument and calls print to print the result of the task. So we’ve defined a Coroutine object and a callback method,

The desired effect is to execute the declared callback method when the Coroutine object is finished executing. This is done by calling the add_done_callback method. We pass the callback method to the wrapped task so that we can call it when the task finishes executing. The task is passed as an argument to the callback method, and the result method of the task is called to retrieve the result.

The running results are as follows:

Task:  <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4> cb=[call_back() at D: / python/pycharm2020 / program/test_003 py: 8] > Status: < the Response [200] > Task:  <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>Copy the code

You can also call the result method directly after the task has finished running without calling the callback method, as shown below:

import asyncio import requests async def call_on(): status = requests.get('https://www.baidu.com') return status def call_back(task): Print ('Status: ', task.result()) corountine = call_on() task = asyncio.ensure_future(corountine) print(' task: ', task.result()) corountine = call_on() ', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print(' task: ', task) print(' task: ', task) ', task.result())Copy the code

It works the same:

Task: < Task pending coro = < call_on () running at D: / python/pycharm2020 / program/test_003 py: 4 > > Task:  <Task finished coro=<call_on() done, Defined at D: / python/pycharm2020 / program/test_003 py: 4 > result = < Response [200] > > Task: < the Response [200] >Copy the code

Three, asynchronous crawler implementation

To achieve asynchronous processing, have to be suspended operation, when a task needs to wait for the IO results, can hang the current task, to perform other tasks, so as to make full use of resources, to achieve asynchronous, need to know the usage, await use await can be time-consuming to wait operation hangs, give up control. When an await is encountered while executing a coroutine, the loop suspends the coroutine and executes another coroutine until the other coroutine suspends or completes execution.

The object following await must be in one of the following formats:

  • A native Coroutine object returned from A native Coroutine function.
  • A generator-based Coroutine object returned from A function decorated with types. Coroutine, A generator-based coroutine object returned from A function decorated with types. This generator can return coroutine objects.
  • An object with An await method returning An iterator; An object with An await method returning An iterator;

The use of aiohttp

Aiohttp is a library that supports asynchronous requests. With it and asyncio, we can easily implement asynchronous request operations. The following is an example of an asynchronous crawler that accesses a blog post and returns reponse.text().

from lxml import etree import requests import logging import time import aiohttp import asyncio logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477' start_time = time.time() # Get_urls (): headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"} resp = requests. Get (url, headers=headers) html = etree.HTML(resp.text) url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href") return url_list async def request_page(url): logging.info('scraping %s', url) async with aiohttp.ClientSession() as session: response = await session.get(url) return await response.text() def main(): url_list = get_urls() tasks = [asyncio.ensure_future(request_page(url)) for url in url_list] loop = asyncio.get_event_loop() tasks = asyncio.gather(*tasks) loop.run_until_complete(tasks) if __name__ == '__main__': main() end_time = time.time() logging.info('total time %s seconds', end_time - start_time)Copy the code

Change the requests library from aiOHTTP to AIOHTTP, and make requests via the GET method of aiOHTTP’s ClientSession class.

The convenience of asynchronous operations is that when a blocking operation is encountered, the task is suspended and the program moves on to another task rather than waiting for it, thus making full use of CPU time rather than wasting it waiting for IO.

The above example is compared to the single-threaded and multi-threaded versions as follows:

Multi-threaded version

import requests import logging import time from lxml import etree from concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477' headers = {"user-agent": "Mozilla / 5.0 (Windows NT 6.1; WOW64) AppleWebKit / 537.1 (KHTML, Like Gecko) Chrome/22.0.1207.1 Safari/537.1"} def get_urls() resp = requests.get(url, headers=headers) html = etree.HTML(resp.text) url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href") return url_list def request_page(url): logging.info('scraping %s', url) resp = requests.get(url, headers=headers) return resp.text def main(): url_list = get_urls() with ThreadPoolExecutor(max_workers=6) as executor: executor.map(request_page, url_list) if __name__ == '__main__': main() end_time = time.time() logging.info('total time %s seconds', end_time - start_time)Copy the code

The running results are as follows:

Single-threaded edition:

import requests import logging import time from lxml import etree logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477' headers = {"user-agent": "Mozilla / 5.0 (Windows NT 6.1; WOW64) AppleWebKit / 537.1 (KHTML, Like Gecko) Chrome/22.0.1207.1 Safari/537.1"} def get_urls() resp = requests.get(url, headers=headers) html = etree.HTML(resp.text) url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href") return url_list def request_page(url): logging.info('scraping %s', url) resp = requests.get(url, headers=headers) return resp.text def main(): url_list = get_urls() for url in url_list: request_page(url) if __name__ == '__main__': main() end_time = time.time()Copy the code

The running effect is as follows:

After testing, it can be found that if asynchronous requests can be flexibly used in crawlers and the number of concurrent requests can be increased on the premise that the server can withstand high concurrency, the crawler efficiency will be greatly improved.