“This is the 15th day of my participation in the First Challenge 2022. For details: First Challenge 2022.”

In this series of articles, you’ve covered how to create multiple processes, how to synchronize multiple processes using mutex, semaphores, and how to communicate with each other using queues and pipes

The role of the process pool

Reviewing our previous code, in the producer-consumer problem, we wrote to create multiple consumer processes like this:

p2 = multiprocessing.Process(target= customer, args=(conn2, ))
p3 = multiprocessing.Process(target= customer, args=(conn2, )) 
p4 = multiprocessing.Process(target= customer, args=(conn2, ))
Copy the code

As you can see, creating multiple processes is cumbersome, and each process requires the use of a single, identical statement. Similarly, starting a process requires multiple lines of similar statements. The solution to this redundancy is process pooling. Take a look at the process pool implementation for the same operation:

with multiprocessing.Pool(3) as p:
    p.map(customer, [conn2, conn2, conn2,])
Copy the code

Creation and use of process pools

First, we create a process Pool using multiprocessing.pool (3), where 3 refers to the number of worker processes. If this parameter is not provided, os.cpu_count() is used to get the number of cpus as the number of worker processes. When a new request is submitted to the process pool, if the process pool is not full, a new process is created to execute the request. If the process pool is full, the request is blocked until a process in the pool terminates and a new process is created to execute the request.

In addition to specifying the number of processes, you can also specify initializer and initargs when creating a process pool. If these two parameters are provided, initializer(initargs) will be called for each process at startup.

With the process pool created, we can submit requests to it. We use map(func, iterable) to submit requests to the process pool to execute. Similar to the built-in map(func, iterable) function, each element in iterable is used as an argument to call the func function. The request pool creates and runs a process for the request and blocks until the thread runs and results are returned. If you want to avoid blocking, you can use map_async().

In addition, similar to multithreading, there are methods for process pool control. For example, close() is used to close the process pool so that it does not accept new tasks. Terminal () terminates the worker process pool and stops processing pending tasks. Join () blocks the main process until all the children exit.

code

Finally, attach code for producer-consumer problems implemented using process pooling.

import multiprocessing
import time 
import random

class cake:
    def __init__(self, time) :
        self.productionTime = time 
    
    def eat(self) :
        print("Eat a cake product at ", self.productionTime)
    

def cook(conn) :
    for _ in range(8):
        conn.send(cake(time.time()))
        time.sleep(random.random())
    conn.close()

def customer(conn) :
    for _ in range(2):
        cake = conn.recv()
        cake.eat()
        time.sleep(random.random() * 5)

if __name__ == "__main__":

    conn1, conn2 = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target= cook, args=(conn1, ))
    p1.start()

    with multiprocessing.Pool(3) as p:
        p.map(customer, [conn2, conn2, conn2, ])

Copy the code