This is the 13th day of my participation in Gwen Challenge

Wechat public number search [program yuan xiaozhuang], pay attention to the halfway program yuan how to rely on Python development to support the family ~

preface

Multiple tasks can be processed concurrently by enabling multi-threading or multi-threading, but if unlimited multi-threading or multi-threading leads to memory overflow, the solution is the concept of pooling that this article will introduce.

Process pool & thread pool

The concurrency effect of TCP server is realized by enabling multi-threading, that is, every time a client requests to connect to the server, the server will open a process or thread to deal with, the code implementation is as follows:

Server code:

# server.py
import socket
from threading import Thread


def communicate(conn) :
    while True:
        try:
            data = recv(1024)
            if noe data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close
    
def sever(ip,port) :
    sever = socket.socket()
    sever.bind((ip,port))
    sever.listen(5)
    while True:
        conn,addr = accept()
        Create a thread for the client
        t = Thread(target=communicate,args=(conn,))
        t.start()

if __name__ == '__main__':
    s = Thread(target=sever,args=('127.0.01.1'.8080))
    s.start()
Copy the code

Client code:

# client.py
import socket

client = socket.socket()
client.connect(('127.0.0.1'.8080))

while True:
    msg = input('" "').strip()
    if len(msg) == 0:
        continue
    if msg == 'q':
        break
    client.send(msg.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
client.close()
Copy the code

The code above the server by opening multi-threading implementation the server-side concurrent effect, but each has a client request and then open a thread is obviously quite a waste of resources, it opened a restaurant, for example, a person is recruiting a waiter, to 1000 people, will need to recruit 1000 waiter, sooner or later, the restaurant will be packed, similarly on the program is the same, Both the process and the thread need to consume resources, but the consumption of the thread is smaller than that of the hypothetical process. As the developer of the program, it is impossible to do unlimited process and thread, because the computer hardware resources can not keep up with the hardware development speed is far behind the software. The goal of development should be to make the most of the computer hardware while keeping it working. Hence the introduction of the concept of pools.

The concept of the pool

The pool is used to maximize the use of the computer while ensuring the security of the computer hardware. It reduces the efficiency of the program but ensures the security of the computer hardware. After the pool is created, a fixed number of threads or processes are created, which will not change, just like the waiters in a restaurant, no matter how many people come to eat, the waiters are fixed. Processes or threads in the pool are not created and destroyed repeatedly, greatly saving computer resources.

There are two methods of task submission: synchronous and asynchronous. The result of asynchronous task submission should be obtained through the callback mechanism. The callback mechanism is equivalent to giving each waiter a task, and after the task is completed, the waiter will actively report to the boss.

Here’s how to start processes and threads using process pools and thread pools.

The current module

The following code is the basic use of thread pool. The use of process pool will not be described too much. The use of process pool is the same as the use of thread pool. The class used by the process pool is ProcessPoolExecutor.

from concurrent.futures import ThreadPoolExecutor
import os
import time

Create a thread pool object
pool = ThreadPoolExecutor(5)
A number can be passed in parentheses. If not, the number of threads will be set to 5 times the number of cpus on the current computer. After the pool is created, there will be a fixed number of threads in brackets. These threads will not be created and destroyed repeatedly.


def task(n) :
    print(n,os.getpid())
    time.sleep(2)


Pool.submit (task,1) print(' master ') print(' master ') print(' master ') print(' master ') print(' master ') print(' master ') print(' master ') print(' master ')

# Requirement: Wait for all tasks in the thread pool to complete before continuing -- changing asynchrony to synchronization
res_list = []
for i in range(20):
    res = pool.submit(task,i)
    res_list.append(res)
Close the thread pool and wait for all tasks in the thread pool to complete
pool.shutdown()
The value of res is the return value of the task, which is the result of the asynchronous submission task
for res in res_list:
    print(res.result()) # result method: change asynchrony to synchronous
The program changes from concurrent to serial, that is, the result is not the return value and output value cross, but the print of the task first, then the return value of the task.


An advanced version of how the asynchronous callback mechanism gets the return value
def call_back(n) :
    print('call_back', n.result())

if __name__ == '__main__':


    for i in range(20):
        res = pool.submit(task,i).add_done_callback(call_back)
Copy the code

To summarize the above code, there are only a few things you need to know:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

pool = ProcessPoolExecutor(5)	Set the size of the pool
pool.submit(task, i).add_done_callback(call_back)	# Asynchronously submit the task, callback processing collection results
Copy the code

coroutines

Through the previous article, we know that process is a unit of resources, while thread is the real execution unit of CPU. The concept of coroutine is a concept put forward by the developers themselves. Specifically, it is to realize concurrency under a single thread, and it is necessary to control the switch of multiple tasks + save state in the program. However, in a single thread, if one IO operation is blocked and the task is not switched, the thread will be blocked in place and other tasks in the thread cannot be executed.

Single-threaded program cuhk probability will appear inevitable under IO operations, but if we can in their own applications (namely the level of the user program, rather than the operating system level) multiple tasks under the control of single thread, met IO obstruction in a task when you switch to another task to calculate, so as to ensure the thread can maximum limit is in the ready state, A state that is ready to be executed by the CPU at any time, which is equivalent to the developer detecting IO operations at the code level. Once the program encounters IO operations, the switch is completed at the code level. This gives the CPU the feeling that the program has been running, no IO operation, thus improving the efficiency of the program.

To detect IO operations at the code level, switch tasks and save the state, the gEvent module is required. The module itself cannot detect some common IO operations. PIP Install Gevnet is a third-party module, and two additional lines of code need to be imported when using this module:

from gevent import monkey
monkey.patch_all()
Copy the code

Since the above two lines of code must be imported to use the gEvent module, the above code also supports shorthand:

from gevent importmonkey; monkey.patch_all()Copy the code

To see how the gEvent module is used, use the following example:

from gevent import monkey; monkey.patch_all()
import time
from gevent import spawn  # spawn Enables task switching

def heng() :
    print('oh')
    time.sleep(2)
    print('. Heng. ')

def ha() :
    print('ha')
    time.sleep(3)
    print('. Ha. ')
start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g1.join()  # Wait for the detected task to complete before continuing
g2.join()
print(time.time()-start_time)

Run resultHum ha. Heng. . Ha.3.0066308975219727  # is the number of tasks that took the longest, not the sum of tasks.
Copy the code

Similarly, concurrency on the TCP server can be achieved through coroutines as follows:

Server code:

from gevent importmonkey; monkey.patch_all()import socket
from gevent import spawn

def communicate(conn) :
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()


def sever(ip,port) :
    sever = socket.socket()
    sever.bind((ip,port))
    sever.listen(5)
    while True:
        conn,addr = sever.accept()
        spawn(communicate,conn)

if __name__ == '__main__':
    g1 = spawn(sever,'127.0.0.1'.8080)
    g1.join()
Copy the code

Client code:

import socket
from threading import Thread,current_thread
def client() :
    client = socket.socket()
    client.connect(('127.0.0.1'.8080))
    while True:
        n = 0
        while True:
            msg = '%s say hello %s' % (current_thread().name, n)
            n += 1

            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))

if __name__ == '__main__':
    for i in range(100):
        t = Thread(target=client)
        t.start()
Copy the code

conclusion

The article was first published on the wechat public account Program Yuan Xiaozhuang, and synchronized with nuggets and Zhihu.

The code word is not easy, reprint please explain the source, pass by the little friends of the lovely little finger point like and then go (╹▽╹)