Public id: pythonislover

The following are the links to three articles on python multithreading, which give an overview of the ways in which multithreading can be used:

Learn about simple Python multithreading implementations and gil-mp.weixin.qq.com/s/Hgp-x-T3s…

An article clarify the Python multithreaded synchronization locks, deadlocks and recursive locking – mp.weixin.qq.com/s/RZSBe2MG9…

An article clarify the Python multithreaded synchronization conditions, semaphores and queue – mp.weixin.qq.com/s/vKsNbDZnv…

We will start today with the python multi-process section. If you have read the previous article, you will know that there is a Python GIL. When you are multi-threading, only one thread can run on a CPU at a time, and only one CPU can run on it, no matter how many cores you have. If you want to make full use of multi-core CPU resources, you need to use multi-processes in most cases in Python.

1.Python multi-process modules

Multiprocessing in Python is implemented through the Multiprocessing package, similar to the threading.Thread, which uses the Multiprocessing. Process object to create a Process object. The methods of this process object are pretty much the same as the methods of a Thread object there are methods like start(), run(), join() and so on, but one of them is different Thread and the daemon Thread method in the Thread object is setDeamon, The daemon of the Process object is done by setting the daemon property.

Here’s how Python multiprocesses are implemented, similar to multithreading

2.Python multi-process implementation method 1

from multiprocessing import  Process

def fun1(name):
    print('Test %s multiprocess' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  # Enable 5 child processes to execute fun1 function
        p = Process(target=fun1,args=('Python')),Instantiate the process object
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('End of test')
Copy the code

The results of

Testing Python Multi-process Testing Python Multi-process testing Python Multi-process testing Python Multi-process testing Python Multi-process testing Process Finished withexit code 0
Copy the code

The above code opens up 5 child processes to execute the function. As you can see, the result is printed at the same time. True parallelism is achieved here, where multiple cpus are performing tasks at the same time. We know python process is the smallest unit of resource distribution, also is the middle of the process data, it is not Shared memory, every start a process, should be independent the allocation of resources and copy the data, so the process of starting and the costs of destruction is bigger, so in actual use process, more is to be set according to the configuration of the server.

3.Python multi-process implementation method 2

Remember the second implementation of multithreading in Python? Is implemented through class inheritance, as is the second implementation of Python multiprocess

from multiprocessing import  Process

class MyProcess(Process): # inherit the Process class
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name

    def run(self):
        print('Test %s multiprocess' % self.name)


if __name__ == '__main__':
    process_list = []
    for i in range(5):  # Enable 5 child processes to execute fun1 function
        p = MyProcess('Python') Instantiate the process object
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('End of test')
Copy the code

The results of

Testing Python Multi-process Testing Python Multi-process testing Python Multi-process testing Python Multi-process testing Python Multi-process testing Process Finished withexit code 0
Copy the code

The effect is the same as the first method.

We can see that Python is implemented in much the same way as multithreading is implemented.

Other methods of the Process class

Constructor: Process([group [, target [, name [, args [, kwargs]]]]])     Group: thread group     Target: method to execute     Name: process name     Args /kwargs: the argument instance method to pass in:     Is_alive () : returns whether the process is running,bool.     Join ([timeout]) : Blocks the process in the current context until the process calling this method terminates or reaches the specified timeout (optional).     Start () : the process is ready for CPU scheduling     Run () : Strat () calls the run method, and if the instance process does not specify the target passed in, the star executes the default run() method.     Terminate () : Immediately terminates the working process regardless of whether the task is complete.   Daemon: and threadssetDeamon functions the same & EMsp;   Name: process name     Pid: indicates the process IDCopy the code

The use of joins and daemons is similar to python multithreading, which I won’t repeat here, but you can check out the python multithreading series.

4.Python multithreaded communication

The process is the basic unit of the system independent scheduling core to allocate system resources (CPU, memory). Each process is independent of each other. Starting a new process is equivalent to cloning data. This is the most obvious difference between multi-process and multi-thread in use. But is Python isolated in the middle of multiple processes? Of course not. Python also provides ways to communicate and share data between multiple processes (you can modify a copy of data).

The process pair column Queue

Queue is also mentioned in multithreading, when used in producer consumer mode, it’s thread safe, it’s a data pipeline between producers and consumers, and in Python multiprocess, it’s really just a data pipeline between processes, enabling processes to communicate.

from multiprocessing import Process,Queue


def fun1(q,i):
    print('Child process %s starts putting data' %i)
    q.put('I'm %s communicating via Queue' %i)

if __name__ == '__main__':
    q = Queue()

    process_list = []
    for i in range(3):
        p = Process(target=fun1,args=(q,i,))  # Note that args must pass the q object to the method we want to execute, so that the process can Queue with the main process
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('Main process gets Queue data')
    print(q.get())
    print(q.get())
    print(q.get())
    print('End of test')
Copy the code

The results of

Child Process 0 starts to put data Child Process 1 starts to put data Child Process 2 starts to put data The primary Process obtains Queue data. I am 0 communicating with Queue. I am 1 communicating with Queueexit code 0
Copy the code

The result of the above code shows that the main process can obtain the data of put in the child process through Queue, and realize the communication between processes.

Pipeline Pipe

Pipe and Queue are used to communicate between processes in the same way

from multiprocessing import Process, Pipe
def fun1(conn):
    print('Child process sends message:')
    conn.send('Hello main process')
    print('Child process receives message:')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() # Key point, pipe instantiates a bidirectional pipe
    p = Process(target=fun1, args=(conn2,)) #conn2 passes to the child process
    p.start()
    print('Main process receives message:')
    print(conn1.recv())
    print('Main process sends message:')
    conn1.send("Hello child process")
    p.join()
    print('End of test')
Copy the code

The results of

The primary Process receives the message: The child Process sends the message: Hello Primary Process Sends the message: Hello secondary Process finishes the testexit code 0
Copy the code

You can see above that the master and child processes can send messages to each other

Managers

Queue and Pipe only implement data interaction, not data sharing, where one process changes another’s data. It takes so long to use Managers

from multiprocessing import Process, Manager

def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #,1,2,3,4,0,1,2,3,4,5,6,7,8,9 [0]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()Note that the dictionary declaration is not directly defined by {}
        l = manager.list(range(5))#,1,2,3,4 [0]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)
Copy the code

Results:

{0: 'a'.'2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
Copy the code

You can see that the main process defines a dictionary and a list. In the sub-process, the contents of the dictionary can be added and modified, and new data can be inserted into the list to implement data sharing between processes. That is, the same data can be modified together

5. Process pool

The process pool maintains a process sequence internally. When used, a process is fetched from the process pool. If no process is available in the process pool sequence, the program waits until the process pool is available. A fixed number of processes are available.

There are two methods in the process pool:

Apply: indicates synchronization. Generally, this command is not used

Apply_async: asynchronous

from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)... ' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task % S runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) Create a pool of 5 processes

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    pool.close()
    pool.join()
    print('End of test')
Copy the code

The results of

Run task 0 (37476)... Run task 1 (4044)... Task 0 runs 0.03 seconds. Run Task 2 (37476)... Run task 3 (17252)... Run task 4 (16448)... Run task 5 (24804)... Task 2 runs 0.27 seconds. Run Task 6 (37476)... Task 1 runs 0.58 seconds. Run Task 7 (4044)... Task 3 runs 0.98 seconds. Run Task 8 (17252)... Task 5 runs 1.13 seconds. Run Task 9 (24804)... Task 8 runs 2.18 seconds. Task 7 runs 2.93 seconds. Task 9 runs 2.93 seconds  seconds. The end of the testCopy the code

Calling join() on the Pool object will wait for all child processes to complete. Close () must be called before calling join(), and no new processes can be added after calling close().

Process pool map method

The case comes from the network, please inform us of the infringement, thank you

Because I saw this example on the Internet, I think it is good, so I will not write my own case here, this case is more convincing

Import OS import PIL from multiprocessing import Pool from PIL import Image SIZE = (75,75) SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == \'__main__\':
    folder = os.path.abspath(
        \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images) # Key point, images is an iterable
    pool.close()
    pool.join()
Copy the code

The main job of this code is to iterate through the image files in the incoming folder, generating thumbnails one by one, and saving these thumbnails to a specific folder. On my machine, it took 27.9 seconds to process 6,000 images. The map function does not support manual thread management, which makes debugging much easier.

Map can also be used in the crawler domain, such as content crawling of multiple urls, which can be put into a meta-ancestor and passed to the executing function.