• Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”

oneInterprocess communication

⚽ Uses queues for interprocess communication

  • The Queue class in the Multiprocessing module enables interprocess communication

  • Queue method introduction

    from multiprocessing import Queue
    Obj = Queue()
    Copy the code
    • Obj.put(item, block=True, timeout=None)

    Item adds queue; Block When true, once the queue is full, the code blocks until some process fetches data and makes room for Obj. The timeout argument is used to set the length of the block, that is, after timeout seconds, the program will throw a queue.full exception if there is no free space.

    • Obj.get(block=True, timeout=None)

    Remove the item from the queue and return it; If the optional arguments block is true and timeout is None (the default), block if necessary until the project is available.

    • Obj.put_nowait(item)

    Queue items without blocking. Queue items only if available slots are immediately available. Otherwise a complete exception is thrown.

    • Obj.get_nowait()

    Removes the item from the queue and returns it without blocking. Items can be retrieved only if they are immediately available. Otherwise, a null exception is raised.

    • Obj.empty()

    If the queue is empty, this method returns True. Otherwise, return False.

    eg:

    from multiprocessing import Process, Queue, current_process
    
    
    def processFun(queue, name) :
       print(current_process().pid, "Process puts data:", name)
       Put name into the queue
       queue.put(name)
    
    
    if __name__ == '__main__':
       Create a Queue for process communication
       queue = Queue()
       Create a child process
       process = Process(target=processFun, args=(queue, "sun process"))
       # Start the child process
       process.start()
       The child process must complete first
       process.join()
       print(current_process().pid, "Fetch data:")
       print(queue.get())
    Copy the code

    result:

    5200 Process data: sun process 15152 Data: Sun processCopy the code

🏐 uses pipes for inter-process communication

  • To implement process communication using Pipe, you first need to callmultiprocessingthePipe()Function to create a pipe.

Eg:

conn1,conn2 = multiprocessing.Pipe(duplex=True)
Copy the code
  • Conn1 and conn2 are used to receive the two ports returned by the Pipe function respectively; The duplex parameter defaults to True to indicate that the pipe is bidirectional, meaning that processes on both ports can send and receive data. A value of False to Duplex indicates that the pipe is unidirectional, and conn1 can only be used to receive data and conn2 can only be used to send data.

  • Conn.send () sends data

  • Conn.recv () receives the sent data

  • Conn.close () closes the connection

  • Conn.poll () returns whether there is still data to read from the connection

  • conn.send_bytes(buf, offset=0, size=None)

Send bytes of data. If offset and size are not specified, all buffer bytes are sent by default. If offset and size are specified, only size bytes in the buffer string starting at offset are sent. Data sent through this method should be received using either the recv_bytes() or recv_bytes_INTO methods.

  • conn.recv_bytes(maxlength)

To receive data sent via the send_bytes() method, maxlength specifies the maximum number of bytes to receive. This method returns the received bytes of data.

  • conn.recv_bytes_into(buf, offset=0)

The function is similar to the recv_bytes() method, except that it places the received data in buffer.

eg:

from multiprocessing import current_process, Pipe, Process def processFun(conn, name): Send (name) # conn.send_bytes(bytes(name)) if __name__ == '__main__': Process =process (target=processFun, args=(conn1, Join () print(current_process().pid," ") print(conn2.recv()) # print(conn2.recv_bytes())Copy the code

result:

The Process sends data: Sun Pipe Process 3816 Receives data: Sun Pipe ProcessCopy the code

twoProcess the lock

  • ⚾ useLockTo implement a process lock, you first need to callmultiprocessingtheLockTo create an object. useObjThe object’sacquiremethodsLock the process. useObjthereleasemethodsRelease the lock.

Eg:

from multiprocessing import Process, Lock


def f(Obj, i) :
    Obj.acquire()  # lock process
    try:
        print('hello world', i)
    finally:
        Obj.release()  # releases the lock


if __name__ == '__main__':
    Obj = Lock()
    for num in range(10):
        Process(target=f, args=(Obj, num)).start()
Copy the code

result:

hello world 6
hello world 4
hello world 7
hello world 3
hello world 2
hello world 1
hello world 8
hello world 9
hello world 5
hello world 0
Copy the code

threeProduction consumer model

  • 🏀 createproducer(producer) function andcustomer(Consumer) function
def producer(q, role) :
    print("start producer")
    for i in range(10):
        q.put(i)  # producers
        time.sleep(0.5)
        print(f"{role} has set value {i}")
    print("end producer")


def customer(q, role) :
    print("start customer")
    while 1:
        data = q.get()  # consumers
        print(f"{role} has get value {data}")
Copy the code
  • 🎳 Create a queue for use by production consumers
Obj = Queue()
Copy the code
  • 🏑 adds the production consumption task toThreadIn the
pro = Thread(target=producer, args=(Obj, "Producer")).start()
cus = Thread(target=customer, args=(Obj, "Consumer")).start()
Copy the code

eg:

from multiprocessing import Queue
from threading import Thread

import time


def producer(q, role) :
    print("start producer")
    for i in range(10):
        q.put(i)  # producers
        time.sleep(0.5)
        print(f"{role} has set value {i}")
    print("end producer")


def customer(q, role) :
    print("start customer")
    while 1:
        data = q.get()  # consumers
        print(f"{role} has get value {data}")


if __name__ == '__main__':
    Obj = Queue()  Create a queue
    pro = Thread(target=producer, args=(Obj, "Producer")).start()
    cus = Thread(target=customer, args=(Obj, "Consumer")).start()
Copy the code

result:

Start producer start customer Has get value0The producer hasset value 0The consumer has get value1The producer hasset value 1The consumer has get value2The producer hasset value 2The consumer has get value3The producer hasset value 3The consumer has get value4The producer hasset value 4The consumer has get value5The producer hasset value 5The consumer has get value6The producer hasset value 6The consumer has get value7The producer hasset value 7The consumer has get value8The producer hasset value 8The consumer has get value9The producer hasset value 9
end producer
Copy the code