This is the 9th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Why do processes need to communicate?

1. Data transmission

One process needs to send its data to another process;

2. Resource sharing

Multiple processes share the same resource.

3. Event notification

One process needs to send a message to another or a group of processes notifying them of an event;

4. Process control

Some processes want complete control over the execution of another process (such as the Debug process), which wants to be able to intercept all operations of the other process and be aware of its state changes in a timely manner.

For these reasons, the concept of interprocess communication is introduced. How should we communicate between processes?

Principles of interprocess communication

Each process has a different user address space, and global variables in either process are not visible in the other. Therefore, to exchange data between processes, a buffer must be created in the kernel. Process 1 copies data from user space to the kernel buffer, and process 2 reads data from the kernel buffer. The mechanism provided by the kernel is called interprocess communication mechanism.

Several modes of interprocess communication

The pipe

Anonymous pipe

A pipe is a half-duplex mode of communication in which data flows only one way and can only be used between related processes. Process kinship usually refers to the parent-child process relationship.

A named pipe

Named pipes are also a half-duplex communication mode, but they allow communication between unrelated processes.

The message queue

Message queues are linked lists of messages stored in the kernel and identified by message queue identifiers. The message queue overcomes the disadvantages of little signal transmission, pipe carrying only plain byte stream and limited buffer size.

Shared memory communication

Shared memory maps a segment of memory that can be accessed by other processes. This segment of shared memory is created by one process but can be accessed by multiple processes. Shared memory is the fastest IPC method and is specifically designed for the low efficiency of other interprocess communication methods. It is often used in conjunction with other communication mechanisms, such as semaphores, to achieve synchronization and communication between processes.

A semaphore

A semaphore is a counter that can be used to control access to a shared resource by multiple processes. It is often used as a locking mechanism to prevent other processes from accessing a shared resource while one process is accessing it. Therefore, it is mainly used as a means of synchronization between processes and between different threads within the same process.

Socket communication

Sockets are also an interprocess communication mechanism that, unlike other communication mechanisms, can be used for process communication between different machines.

signal

Signals are a complex form of communication used to notify a receiving process that an event has occurred.

How is process communication implemented in Python?

Processes are isolated from each other, and to implement interprocess communication (IPC), the Multiprocessing module supports two forms: queues and pipes, both of which use messaging.

Communication between processes must find a medium that satisfies:

  1. Is shared by all processes.
  2. It has to be memory space, and in the meantime, help us automatically handle the lock problem.

Exchanging data through message queues can greatly reduce the need for locking and other synchronization methods,

The pipe

from multiprocessing import Process, Pipe
import time


def consumer(p, name) :
    left, right = p
    left.close()
    while True:
        try:
            baozi = right.recv()
            print('%s received bun :%s' % (name, baozi))
        except EOFError:
            right.close()
            break


def producer(seq, p) :
    left, right = p
    right.close()
    for i in seq:
        left.send(i)
        time.sleep(1)
    else:
        left.close()


if __name__ == '__main__':
    left, right = Pipe()

    c1 = Process(target=consumer, args=((left, right), 'c1'))
    c1.start()

    seq = (i for i in range(10))
    producer(seq, (left, right))

    right.close()
    left.close()

    c1.join()
    print('Interprocess Communication - Pipeline - Main Process')
Copy the code

Running results:

C1 Received bun :0 C1 received bun :1 C1 received bun :1 C1 received bun :2 C1 received bun :3 C1 received bun :4 C1 received bun :5 C1 received bun :6 C1 received bun :7 C1 received bun :8 C1 received BUN :9 Interprocess communication - Pipeline - Main processCopy the code

Note:

Pipes can be used for two-way communication and can be used to write programs that interact with processes using request/response models or remote procedure calls used in client/server.

An endpoint of a pipe that is not used by either the producer or the consumer should be closed, such as the right end of the pipe in the producer and the left end in the consumer. If you forget to perform these steps, the program may hang on the recv() operation in the consumer. Pipes are referentially counted by the operating system and must be closed in all processes to produce EOFError exceptions. Therefore, closing the pipe has no effect among producers, and the same pipe endpoint should be closed among consumers.

Queues (recommended)

Let’s implement a simple producer-consumer model.

from multiprocessing import Process, Queue, set_start_method
import time,random,os


def consumer(q) :
    while True:
        res=q.get()
        if res is None: break End upon receipt of end signal
        time.sleep(random.randint(1.3))
        print('033 [45 m % s \ % s \ [0 033 m' %(os.getpid(),res))


def producer(q) :
    for i in range(10):
        time.sleep(random.randint(1.3))
        res='the steamed stuffed bun % s' %i
        q.put(res)
        print('\033[46m%s produces %s\033[0m' %(os.getpid(),res))
    q.put(None) Send the end signal


if __name__ == '__main__':
    set_start_method('fork')

    q=Queue()
    # Producers: chefs
    p1=Process(target=producer,args=(q,))

    # Consumers: Foodies
    c1=Process(target=consumer,args=(q,))

    # start
    p1.start()
    c1.start()
    print('Interprocess communication - Queue - Main process')
Copy the code

Running results:

Interprocess communication - queue - Main process 25720 produces baozi 0 25720 produces baozi 1 25720 produces baozi 2 25721 eats baozi 0 25720 produces baozi 3 25721 eats baozi 1 25721 eats baozi 2 25720 produces baozi 4 25721 eat steamed stuffed bun 3 257720 produce steamed stuffed bun 5 25721 eat steamed stuffed bun 4 25720 produce steamed stuffed bun 6 25721 eat steamed stuffed bun 5 25721 eat steamed stuffed bun 6 25720 produce steamed stuffed bun 7 25720 produce steamed stuffed bun 8 25721 eat steamed stuffed bun 7 25721 eat steamed stuffed bun 8 25720 produce steamed stuffed bun 9 25721 eat steamed stuffed bun 9Copy the code

Note:

The producer sends an end signal None after production.

The end signal, None, does not have to be sent by the producer. It can be sent by the main process, but the main process should wait for the producer to finish before sending it.

Shared data

Although data is independent between processes, it can be shared through the Manager.

Interprocess communication should avoid sharing data as described in this section.


from multiprocessing import Manager, Process,Lock

def work(d,lock) :
      with lock: If you operate on shared data without locking it, it will definitely cause data corruption
        print(The f" counter is reduced by one and is currently:{d['count']}")
        d['count'] - =1


if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':20})
        p_l=[]
        for i in range(20):
            p=Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()

        for p in p_l:
            p.join()
        print(dic)
Copy the code

Running results:

Counter minus 1, now: 20 counter minus 1, now: 19 counter minus 1, now: 18 counter minus 1, now: 17 counter minus 1, now: 16 counter minus 1, now: 14 counter minus 1, now: 13 counter minus 1, now: 12 counter minus one, current: 11 counter minus one, current: 10 counter minus one, current: 9 counter minus one, current: 8 counter minus one, current: 6 counter minus one, current: 5 counter minus one, current: 4 counter minus one, current: 1 {'count': 0} 2 {'count': 0}Copy the code

Semaphore (Understood)

Mutex allows only one thread at the same time change the data, and Semaphore is allowed a certain number of threads at the same time the change data, such as toilet has three pits, the maximum allowed only three people going to the toilet at the back of the people have to wait for someone inside out to go in, if the specified Semaphore is 3, so to a person get a lock, count + 1, When the count is equal to 3, everyone in the back has to wait. Once released, someone can get a lock.

Semaphores are similar to the concept of process pooling, but to be distinguished, semaphores involve the concept of locking.

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user) :
    sem.acquire()
    print('%s occupy a latrine ' %user)
    time.sleep(random.randint(0.3)) # Simulation Everyone shitting speed is different, 0 means some people squat up
    sem.release()


if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('= = = = = = = = = = = = "")
Copy the code

Running results:

User0 occupies a pit user1 occupies a pit user2 occupies a pit user3 occupies a pit user5 occupies a pit user4 occupies a pit user7 occupies a pit user9 occupies a pit user8 occupies a pit User6 occupies a pit user11 occupies a pit user12 occupies a pit user10 occupies a pit ============Copy the code

Signals/Events (Understood)

Python process events are used by the main process to control the execution of other processes. Events provide three methods: set, wait, and clear.

Event handling mechanism:

If Flag is False, the event. Wait method will block. If Flag is True, the event. Wait method will not block.

Clear: set Flag to False, set Flag to True.


import multiprocessing
import time

from multiprocessing import Process, Queue, set_start_method

event = multiprocessing.Event()

def xiao_fan(event) :
    print('Vendor: Produce... ')
    print(Vendor: Sell... ')
    # time.sleep(1)
    print('Hawker: Waiting for your meal')
    event.set()
    event.clear()
    event.wait()
    print('Vendor: Thank you for coming')
    event.set()
    event.clear()


def gu_ke(event) :
    print(Customer: Ready to buy breakfast)
    event.set()
    event.clear()
    event.wait()
    print('Customer: Get breakfast')
    print('Customer: Enjoy the food')
    # time.sleep(2)
    print(Customer: Payment, delicious... ')
    event.set()
    event.clear()


if __name__ == '__main__':
    set_start_method('fork'.True)

    Create process
    xf = multiprocessing.Process(target=xiao_fan, args=(event,))
    gk = multiprocessing.Process(target=gu_ke, args=(event, ))
    # start process

    gk.start()
    xf.start()

    # time.sleep(2)
Copy the code

Running results:

Customer: Ready to buy breakfast Vendor: Produce... Vendor: Sell... Vendor: Waiting for your meal Customer: Get your breakfast Customer: Enjoy the delicious food Customer: Pay, it's delicious... Vendor: Thank you for comingCopy the code

conclusion

For shared memory, data operations are the fastest because they operate directly at the memory level, eliminating the need for intermediate copying. However, shared memory can only run on a single machine, and can only operate on basic data formats, not directly share complex objects.

Pipes and queues do not transfer data as fast as shared memory, and the size of each transfer is limited. But using queues can be passed between multiple processes and can be shared between processes on different hosts to achieve distribution.

Anonymous pipes can only be shared between parent and child processes, while named pipes can be shared between different processes on the same machine or between processes on different machines across a network.

Reference documentation

  • Analysis of several ways of communication between processes (including example source code)
  • Multiple Processes in Python Concurrent Programming (Practice)
  • Ways in which processes communicate — signals, pipes, message queues, shared memory