Why introduce concurrent programming

Scenario 1: a web crawler takes 1 hour to crawl sequentially, which is reduced to 20 minutes with concurrent download. Scenario 2: an APP application takes 3 seconds to open each page before optimization, which is improved to 200 milliseconds with asynchronous concurrency

Concurrency was introduced to speed up applications.

If you like it, you can follow my official account and search on wechat: Feelwow

Support for concurrent programming in Python

  • Threading: The threading module allows the CPU and I/O to execute simultaneously so that the CPU does not wait for the I/O to complete
  • Multiprocess: The multiprocessing module takes advantage of the capabilities of multi-core cpus to truly execute tasks in parallel
  • Asynchronous IO: Asyncio module, in a single thread using the PRINCIPLE of CPU and IO simultaneous execution, to achieve asynchronous function execution

Python also provides modules to aid or simplify concurrency.

  • Use Lock to Lock resources to prevent conflicting access
  • Queue is used to realize the data communication between different threads and processes, and the producer-consumer model is realized
  • Use thread Pool and process Pool to simplify thread and process task submission, wait for completion, and obtain results

How to choose

  • CPU intensive: CPU intensive means that THE I/O can be completed in a short time and the CPU needs a large amount of computing and processing. The CPU usage is very high. Typical examples: compression, decompression, encryption and decryption, regular expression search, etc
  • IO intensive: IO intensive refers to the situation in which the CPU is waiting for I/ OS, such as the reading and writing of disks, memory, and network. In this situation, the CPU usage is low and the SYSTEM I/ OS are very high. Typical examples: file handlers, web crawlers, read and write databases, etc

Python multi-process, multi-thread, multi-coroutine comparison

  • Multithreading:
    • Advantages: More lightweight and less resource-intensive than processes
    • Disadvantages: Compared with coroutines, limited number of startup, memory footprint, thread switching overhead
    • Applicable to: IO – intensive computing that requires a small number of tasks to be run simultaneously
  • Multiple processes
    • Advantages: Can use multi-core CPU parallel computing
    • Disadvantages: occupy most resources, can start the number of fewer than threads
    • Applicable to: CPU intensive computing scenarios
  • Many coroutines
    • Advantages: minimum memory overhead, start the number of coroutines can be very much
    • Disadvantages: Limited library support, such as inability to use the Requests module, complex code implementation
    • Suitable for: IO – intensive computing, super – multitasking, ready-made library support scenarios

The Python global interpreter lock GIL

Two reasons why Python is slow?

Compared to other languages, such as C/C++/ Java/Golang, Python is really slow, and in some specific scenarios, Python is 100 to 200 times slower than C++

So why is Python slow?

  1. Dynamically typed languages, interpreted at the same time
  2. GIL cannot execute concurrently with a multi-core CPU

So what’s GIL?

GIL: Global interpreter lock. It is a mechanism used by computer programming language interpreters to synchronize threads so that only one thread is executing at any time. Even on multi-core processors, interpreters using the GIL allow only one thread to execute at a time.

Why is GIL present?

At the beginning of Python’s design, GIL was introduced to avoid concurrency problems and solve the problem of data integrity and state synchronization between multiple threads.

Since objects in Python are managed using reference counters, objects are released when the number of references is zero.

Such as: There are two threads A and B that want to reference object obj and cancel the object. Thread A performs the undo first and decrement object obj by one. At this time, A multithreaded scheduling switch occurs. At this point, the object obj has a count of 0, and Python will release the object, which may damage memory.

During the execution of multithreading, the thread will release the GIL and realize the parallel execution of CPU and IO. Therefore, multithreading will greatly improve the operation efficiency of IO intensive.

A way to create multiple threads

Create a multithreaded process

  1. Prepare an execution function, such as:
def my_func(a, b) :
    do_something(a, b)
Copy the code
  1. Create a thread
import threading
t = threading.Thread(target=my_func, args=(100.200)),Copy the code
  1. Starting a thread
t.start()
Copy the code
  1. Waiting for the end
t.join()
Copy the code

The crawler example

Test the sample from crawl Beijing xinfadi price information, address is as follows: www.xinfadi.com.cn/priceDetail…

Browser f12 caught analysis, you can see, the price information is through www.xinfadi.com.cn/getPriceDat… The request method is POST. We try to get the first page of data as follows:

import requests

url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1) :
    data = {
        "limit": 20."current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    price_list = resp.json()['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    print(res_data)
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(res)

Copy the code

The result is as follows:

[(' cabbage ', 'ji shan liao', '1.15'), (' baby food ', 'hebei', '1.25'), (' cabbage ', ' ', '2.75'), (' cabbage ', 'hebei', '2.5'), (' cabbage ', 'robust', '1.9'), (' purple cabbage, 'ji', '0.75'), (' celery ', 'robust', '2.65'), (' celery ', 'liao', '2.9'), (' spinach ', ' ', '6.5'), (' lettuce ', 'hebei', '2.25'), (' TuanShengCai ', 'hebei', '4.5'), Liao (' leaf lettuce ', 'Beijing', '4.75'), (' Roman lettuce ', 'hebei', '3.25'), (' rape ', 'hebei', '2.9'), (' parsley ', 'hebei', '6.0'), (' fennel ', 'hebei', '6.5'), (' leeks, "yue ji", '2.85'), (' bitter chrysanthemum ', 'liao', '4.5'), (' leaf lettuce, "liao", "6.0"), (' yellow heart food ', 'anhui', '1.55')]Copy the code

Here’s how long it takes for the single thread to crawl through 50 pages of price information:

For testing convenience, here we write a decorator to count the running time of the program. The final single-thread run code is:

import time
import xinfadi_spider
import threading
from functools import wraps


def count_time(func) :
    @wraps(func)
    def _wraper(*args, **kwargs) :
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print(F "Running time:{end - start}")
        return res

    return _wraper


@count_time
def single_thread() :
    for page in range(1.51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)


if __name__ == '__main__':
    single_thread()
Copy the code

The final running time is:

Running time: 13.166715621948242Copy the code

Let’s take a look at the code for multithreading:

@count_time
def single_thread() :
    for page in range(1.51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)
    return


@count_time
def multi_thread() :
    t = []
    for page in range(1.51):
        t.append(
            threading.Thread(target=xinfadi_spider.get_resource, args=(xinfadi_spider.url, page,))
        )
    for thread in t:
        thread.start()
    for thread in t:
        thread.join()


if __name__ == '__main__':
    multi_thread()
Copy the code

The final running speed increased by about 10 times, resulting in:

Running time: 1.8293204307556152Copy the code

Thread safety

Thread safety refers to that when a function or function library is called in a multi-threaded environment, it can correctly handle the shared variables of multiple threads, so that the program functions can be completed correctly

To ensure thread safety, Python provides both Queue and Lock, which are described in the following examples.

The Queue Queue

A Queue is a data type provided by Python. It ensures that a single data cannot be accessed by multiple threads at the same time in a fifo or fifO mode. Therefore, a Queue can be used to access data safely without causing data sharing conflicts.

In addition, Queue can be used to implement the producer and consumer model, decoupling between programs, and solve some problems of high concurrency with fewer resources.

Some common methods of Queue:

  1. Import module:import queue
  2. Create queue:q = queue.Queue()
  3. Add elements:q.put(item)
  4. Get elements:q.get()
  5. Query number of elements:q.qsize()
  6. Check whether it is empty:q.empty()
  7. To determine whether it is full:q.full()

Here, in combination with the above crawler case, Queue is used for concurrent parsing, and the overall process is as follows:

First of all, let’s change our previous crawler to separate parsing and retrieving web data

import requests


url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1) :
    data = {
        "limit": 20."current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    return resp.json()


def parse_resource(resource_data) :
    price_list = resource_data['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(parse_resource(res))
Copy the code

Then write producers and consumers

import queue
import threading
import xinfadi_spider


def producer(page_queue: queue.Queue, resource_queue: queue.Queue) :
    while True:
        page = page_queue.get()
        resource = xinfadi_spider.get_resource(xinfadi_spider.url, page)
        resource_queue.put(resource)
        print(f"{threading.current_thread().name}Take the first climb{page}Page contents, current queue size is:"
              f"{page_queue.qsize()}")


def consumer(resource_queue: queue.Queue) :
    while True:
        resouce = resource_queue.get()
        parse_resource = xinfadi_spider.parse_resource(resouce)
        print(f"{threading.current_thread().name}Parse the data,"
              F "queue size is:{resource_queue.qsize()}")



if __name__ == '__main__':
    page_queue = queue.Queue()
    resource_queue = queue.Queue()

    for page in range(1.51):
        page_queue.put(page)

    for i in range(3):
        t = threading.Thread(name=f'producer-{i}', target=producer, args=(page_queue, resource_queue,))
        t.start()

    for i in range(2):
        t = threading.Thread(name=f'consumer-{i}', target=consumer, args=(resource_queue,))
        t.start()
Copy the code

The final result is as follows:

producer-1Take the first climb2The current queue size is:47
producer-0Take the first climb1The current queue size is:47
producer-2Take the first climb3The current queue size is: 45consumer-1Parses the data and the queue size is:2
consumer-1Parses the data and the queue size is:1
consumer-1Parses the data and the queue size is:0

producer-0Take the first climb5The current queue size is: 44consumer-1Parses the data and the queue size is:0. . . producer-0Take the first climb47The current queue size is:0
consumer-0Parses the data and the queue size is:0
producer-1Take the first climb50The current queue size is:0
producer-2Take the first climb49The current queue size is: 0consumer-0Parses the data and the queue size is:1
consumer-0Parses the data and the queue size is:0
Copy the code

The Lock Lock

With a Lock, the Lock is held before the thread function executes, and when it is finished, the Lock is released, ensuring that only one thread at a time holds the Lock

Let’s take a classic bank withdrawal example and explain why Lock was introduced

import threading


class Account:
    def __init__(self, account, balance) :
        # account
        self.account = account
        # Account balance
        self.balance = balance


def draw_money(account, money) :
    if account.balance >= money:
        print(f"{threading.current_thread().name}The withdrawal is successful, and the withdrawal amount is:{money}")
        account.balance -= money
        print(F "The current account balance is:{account.balance}")
    else:
        print(f"{threading.current_thread().name}Withdrawal failed, insufficient balance!")


if __name__ == '__main__':
    account = Account("tom".1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()
Copy the code

A normal result output would look like this:

A successfully withdraws the money, and the withdrawal amount is:800The current account balance is:200B failed to withdraw money, insufficient balance!Copy the code

We create two threads A and B, so when A runs out of money, the account balance will be 200, so when B runs again, the account balance will be insufficient, so the message will be insufficient, but when we run several times, you will find the following result:

A successfully withdraws the money, and the withdrawal amount is:800B successfully withdraw the money, the withdrawal amount is:800The current account balance is:200Current account balance: -600
Copy the code

The reason for this is that the run() method is not thread-safe. When the thread changes the account balance, a thread switch happens, and another thread B changes the account, thus causing the above result.

After we add a block to the logic of changing the balance in our code, time.sleep(0.5)

time.sleep(0.5)
account.balance -= money
Copy the code

You’ll see that every time you run it, the result will always be wrong

A successfully withdraws the money, and the withdrawal amount is:800B successfully withdraw the money, the withdrawal amount is:800The current account balance is:200Current account balance: -600
Copy the code

So Python solves this problem by introducing the mutex Lock as follows:

  1. Create thread lock:lock = threading.Lock()
  2. Create a recursive lock:lock = threading.RLock()
  3. Lock:lock,acquire()
  4. Release the lock:locak.release()

Try… try… try… finally… The other is through the with method, as follows:

import threading

lock = threading.Lock()

lock.acquire()
try:
    do something
finally:
    lock.release()
Copy the code

or

import threading

lock = threading.Lock()

with lock:
    do something
Copy the code

Here’s the way to use with directly (and more succinctly), combined with the example above:

import threading
import time

class Account:
    def __init__(self, account, balance) :
        # account
        self.account = account
        # Account balance
        self.balance = balance
        Define mutex
        self.lock = threading.Lock()


def draw_money(account, money) :
    with account.lock:
        if account.balance >= money:
            print(f"{threading.current_thread().name}The withdrawal is successful, and the withdrawal amount is:{money}")
            time.sleep(0.5)
            account.balance -= money
            print(F "The current account balance is:{account.balance}")
        else:
            print(f"{threading.current_thread().name}Withdrawal failed, insufficient balance!")


if __name__ == '__main__':
    account = Account("tom".1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()
Copy the code

No matter what blocking is added to the logic, the end result is correct

A successfully withdraws the money, and the withdrawal amount is:800The current account balance is:200B failed to withdraw money, insufficient balance!Copy the code

The thread pool

First, take a look at the thread lifecycle, as shown below:

As you can see from the above figure, when creating a thread, the system needs to allocate resources, and when terminating a thread, the system needs to reclaim resources. Therefore, there is some overhead of creating and terminating threads. If you can reuse threads, then the overhead is reduced.

  1. Improves performance, reduces the overhead of creating and terminating a large number of threads, and reuses thread resources
  2. This method is applicable to the scenario where a large number of requests are suddenly processed or a large number of threads are required to complete the task, but the actual task processing time is short
  3. Can effectively avoid the system because of creating too many threads, resulting in a high system load and slow problem
  4. Using thread pools is much cleaner than using threads alone

The usage method is as follows:

  1. Map function mode

The map results and entry order are fixed

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    # func is the target function
    # args_list is a list of arguments
    results = pool.map(func, args_list)
    Get the result of the execution
    for result in results:
        print(result)
Copy the code
  1. The future model

The as_COMPLETED order is not fixed

from concurrent.futures import ThreadPoolExecutor, as_completed


with ThreadPoolExecutor() as pool:
    # arg refers to a parameter
    futures = [pool.submit(func, arg) for arg in args_list]
    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())
Copy the code

The above crawler case will be combined for transformation. First of all, submit will be used to look at it

import xinfadi_spider
from concurrent.futures import ThreadPoolExecutor


Create a thread to fetch all url resources
with ThreadPoolExecutor() as p1:
    futures = {
        page: p1.submit(xinfadi_spider.get_resource, xinfadi_spider.url, page)
        for page in range(1.51)}for k, v in futures.items():
        print(k, v.result())

with ThreadPoolExecutor() as p2:
    futures_parse = {}
    for resource in futures.values():
        res = p2.submit(xinfadi_spider.parse_resource, resource.result())
        futures_parse[res] = resource
    for k, v in futures_parse.items():
        print(res.result())
Copy the code

Note: When you use submit, you return a future object, which can be returned as result(),

When submitting a task using map, len(iterlables) threads are started to execute the func function concurrently

with ThreadPoolExecutor() as p3:
    res = p3.map(xinfadi_spider.get_resource, [xinfadi_spider.url] * 50, [i for i in range(1.51)])

with ThreadPoolExecutor() as p4:
    p4.map(xinfadi_spider.parse_resource, [r for r in res])
Copy the code

Note that when using map, when passing multiple parameters, ensure that the variable passed is an iterable object, such as an array or a primitive ancestor, and ensure that the number of parameters is consistent.

If you do not use the with keyword to create a thread pool, such as pool = ThreadPoolExecutor(), you need to use pool.shutdown() to close the pool at the end

Multiple processes

Multithreading and coroutines are essentially performed on a single core, while multiprocessing is truly parallel, using multiple processes to execute in parallel on a multi-core CPU.

Since multi-process and multi-threaded writing are almost the same, I won’t go into too much detail here, but just list some ways to create and use them.

  1. The import module
# multiple processes
from multiprocessing import Process

# multithreaded
from threading import Thread
Copy the code
  1. Creating, starting, or waiting ends
# multiple processes
p = Process(target=func, args=(1,))
p.start()
p.join()

# multithreaded
t = Thread(target=func, args=(1,))
t.start()
t.join()
Copy the code
  1. Data communication
# multiple processes
from multiprocessing import Queue
q = Queue()
q.put([1.2.3])
item = q.get()

# multithreaded
import queue
q = Queue()
q.put([1.2.3])
item = q.get()
Copy the code
  1. Thread-safe locking
# multiple processes
from multiprocessing import Lock
lock = Lock()
with lock:
  do_something()
  
# multithreaded
from threading import Lock
lock = Lock()
with lock:
  do_something()
Copy the code
  1. pool
# multiple processes
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
    Method a #
    res = pool.map(func, *iterables)
    Method # 2
    res = pool.submit(func, arg)
    result = res.result()
    
# multithreaded
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
    Method a #
    res = pool.map(func, *iterables)
    Method # 2
    res = pool.submit(func, arg)
    result = res.result()
Copy the code

coroutines

There is a lot to describe about coroutines, so I will leave it to the next chapter.

If you like it, you can follow my official account and search on wechat: Feelwow