Interprocess communication – Queue

Processes sometimes need to communicate with each other, and the operating system provides many mechanisms to do so.

For example, in the program we can use files, message queues, etc.


The use of the Queue

We can use the Queue of the Multiprocessing module to transfer data between multiple processes. The Queue itself is a message Queue program.

from multiprocessing import Queue


def queue_test() :
    q = Queue(3)	Initialize a Queue object to receive up to three PUT messages
    q.put("1") 
    q.put("2")
    print(q.full())	#False
    q.put("3")
    print(q.full()) #True


    The first try will wait 2 seconds before throwing an exception. The second try will throw an exception immediately
    try:
        q.put("4".True.2)
    except:
        print("Message queue full, number of existing messages :%s"%q.qsize())

    try:
        q.put_nowait("4")
    except:
        print("Message queue full, number of existing messages :%s"%q.qsize())


    Check if the queue is full before writing
    if not q.full():
        q.put_nowait("4")


    Check whether the queue is empty before reading the message
    if not q.empty():
        for i in range(q.qsize()):
            print(q.get_nowait())
        
        
def main() :
    queue_test()
    

if __name__ == "__main__"
	main()
Copy the code


Running results:

False
TrueMessage queue full, number of existing messages:3Message queue full, number of existing messages:3The message1The message2The message3
Copy the code


instructions

When the Queue() object is initialized (for example, q = Queue()), if the parentheses do not specify the maximum number of messages that can be received, or if the number is negative, there is no upper limit (until the end of memory) on the number of messages that can be received;

  • Queue.qsize(): Returns the number of messages contained in the current queue;
  • Queue.empty(): Returns True if the queue is empty, False otherwise;
  • Queue.full(): Returns True if the queue is full, False otherwise;
  • Queue.get([block[, timeout]]): Gets a message from the queue and then removes it from the queue. Block defaults to True.

1) If the block is True and timeout is not set, the queue will block if it is empty until the message is read from the queue. If timeout is set, the queue will wait for timeout seconds. If no message has been read, the Queue.Empty exception is raised;

2) If block is False, Queue will be immediately raised if Queue is Empty;

  • Queue.get_nowait(): prettyQueue.get(False);
  • Queue.put(item,[block[, timeout]]): Writes the item message to the queue. Block defaults to True.


1) If the block uses the default value and no timeout (in seconds) is set, if there is no space left to write, the program will block until there is space left from the queue. If timeout is set, the program will wait timeout seconds. If there is no space left, the program will wait for timeout seconds. The Queue.Full exception is raised;

2) If block is False, the Queue will immediately raise “queue.full” if there is no space to write to.

  • Queue.put_nowait(item): prettyQueue.put(item, False);


The Queue instances

Create two child processes in Queue, one to write data to Queue and one to read data from Queue:

import os, time, random
from multiprocessing import Process, Queue


Write the code that the data process executes:
def write(q) :
    for value in ['A'.'B'.'C'] :print('Put %s to queue... ' % value)
        q.put(value)
        time.sleep(random.random())

        
# read the data process to execute the code:
def read(q) :
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

            
def main() :
    The parent process creates a Queue and passes it to its children:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # start child pw, write:
    pw.start()    
    # wait for PW to end:
    pw.join()
    Start child process pr, read:
    pr.start()
    pr.join()
    # the pr process is in an endless loop, so you can't wait for it to end.
    print(' ')
    print('All data is written and read')
    
    
if __name__=='__main__':
    main()
Copy the code


Running results:

Put A to queue...
Put B to queue...
Put C to queue...
Get A from queue.
Get B from queue.
Get C fromQueue. All data is written and Finishedin 4.0s]
Copy the code


Process Pool Pool

When the number of child processes to be created is small, you can use the Process in Multiprocessing to dynamically create multiple processes. However, if there are hundreds or even thousands of target processes, manually creating processes is a huge amount of work. You can use the Pool method provided by the Multiprocessing module.

When initializing the Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the Pool is not full, a new process is created to execute the request. However, if the number of processes in the pool has reached the specified maximum, the request will wait until a process in the pool terminates before the previous process is used to perform the new task, as shown in the following example:

# -*- coding:utf-8 -*-
import os, time, random
from multiprocessing import Pool


def worker(msg) :
    t_start = time.time()
    print("%s starts execution with process number %d" % (msg,os.getpid()))
    Random () randomly generates floating point numbers between 0 and 1
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"Execution completed, time %0.2f\n" % (t_stop-t_start))

    
def main() :
    po = Pool(3)  Define a process pool with a maximum of 3 processes
    for i in range(0.10) :# Pool().apply_async(target to call,(parameter progenitor passed to target,))
        Each loop will use the free child to call the target
        po.apply_async(worker,(i,))

    print("----start----")
    po.close()  The Po will not receive new requests after the process pool is closed
    po.join()  # wait for all child processes in the Po to complete, must be placed after the close statement
    print("-----end-----")
    
   
if __name__ == "__main__":
    main()
Copy the code


Running results:

----start----
2The process id is3248
2The execution is complete0.40

3The process id is3248
3The execution is complete0.60

4The process id is3248
4The execution is complete0.76

8The process id is3248
8The execution is complete0.85

1The process id is12656
1The execution is complete1.07

5The process id is12656
5The execution is complete0.26

6The process id is12656
6The execution is complete1.08

9The process id is12656
9The execution is complete0.37

0The process id is3656
0The execution is complete1.45

7The process id is3656
7The execution is complete1.82

-----end-----
[Finished in 3.6s]
Copy the code


Pool common function parsing

  • apply_async(func[, args[, kwds]]): calls in a non-blocking mannerfunc(parallel execution, blocking mode must wait for the previous process to exit before the next process can be executed), args is the list of arguments passed to func, KWDS is the list of keyword arguments passed to func
  • Close () : Closes the Pool so that it can no longer accept new tasks
  • Terminate () : Immediately terminate whether the task is complete or not
  • join(): Main process is blocked, waiting for the child process to exit, must be incloseterminateAfter using


Queue in the process pool

If you want to create a process using a Pool, you need to use Queue() in multiprocessing.manager () instead of multiprocessing.queue (), otherwise you will get the following error message:

RuntimeError: Queue objects should only be shared between processes through inheritance.

The following example demonstrates how processes in a process pool communicate:

# -*- coding:utf-8 -*-
import os,time,random
from multiprocessing import Manager,Pool


def reader(q) :
    print("Reader starts (%s), parent process (%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("Reader gets message from Queue: %s" % q.get(True))

        
def writer(q) :
    print("Writer starts (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
    for i in "ithui":
        q.put(i)

       
def main() :
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  Use Queue in Manager
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  Let the tasks above store data to the Queue, and then let the tasks below start fetching data from it

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())
    
    
if __name__=="__main__":
    main()
Copy the code


Running results:

(3248Start writer8684), the parent process is (3248) Start reader (13248), the parent process is (3248) Readers get messages from the Queue: I read messages from the Queue: t read messages from the Queue: h read messages from the Queue: u Read messages from the Queue: I (3248) End
[Finished in 1.4s]
Copy the code


The public,

Create a new folder X

Nature took tens of billions of years to create our real world, while programmers took hundreds of years to create a completely different virtual world. We knock out brick by brick with a keyboard and build everything with our brains. People see 1000 as authority. We defend 1024. We are not keyboard warriors, we are just extraordinary builders of ordinary world.